[PYTHON] MemSQL processing capacity verification (Application 1)

This time we will verify the SQL processing power of MemSQL

It is a verification of MemSQL that has been carried out several times in the past, but from this time it is a little (quite ...) forcible, but I would like to verify it with a feeling close to the actual processing. MemSQL itself has very high in-memory compatibility with MySQL, but where do you take advantage of its high transactional performance? In that part, there are cases where we receive comments such as that such fast processing power is not necessary. Therefore, this time we are verifying MemSQL for a long time ** As a target side data source of Equalum ** that anyone can create real-time streaming processing, AI and BI that utilize the data prepared there, and AI in the future I would like to explore the possibility of a knowledge-based position that enables high-speed simulation for various robot-based systems that will lose their boundaries.

スクリーンショット 2020-10-07 9.12.50.png

Assumed scenario

In a situation where sales information comes in real time on MemSQL ... (1) Run a batch processing SQL process at regular time intervals to retrieve necessary information. (2) Finally, the extracted information is automatically generated as a new table, and it is linked with the external processing (assuming R) that utilizes that table. I will do it.

** The point is that the data accumulated on MemSQL is controlled by Equalum ** (1) No matter how much SQL is used, the ** transaction thief ** event against the original data provider does not occur. (2) Since the original data is ** maintained and managed as usual ** on the existing database that has withered and is operating stably, the mechanism on the MemSQL side can be constructed with performance-oriented data computing specifications. (3) Data extracted from different databases on the upstream side can be made transparent SQL targets on MemSQL, so there is no need to forcibly integrate the upstream data source layer **. There will be advantages such as ...! It becomes an urban legendary (?) Hypothesis.

スクリーンショット 2020-10-07 9.13.28.png

Advance preparation

First, create a mechanism to automatically perform continuous processing in Python. This time, ** "Now nostalgic (?) Version 2.7" ** hurry up ** "Operation priority" ** will be created (version 3 will be ported if there is time later ... (sweat) )

 coding: utf-8

# Execute tasks in Python at regular intervals (power version)
# Version 2.7


import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

# Module to import
import schedule
import time


# Set the process to run at regular intervals here


def job():
       
    print("********************************")
 print ("execute the specified job")
    print("********************************")
    
    from datetime import datetime
 print ("JOB start date and time:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
                           
    JOB_SQL = "SELECT DISTINCT * FROM Test_Table WHERE DATE_SUB(NOW(),INTERVAL 60 SECOND) AND Category = 'Test_Category' ORDER BY ts"
    print JOB_SQL
    
    print("********************************")
 print ("The specified job has been executed")
    print("********************************")
    print


# Main part from here


def main():
    
 #Set variables to use
    Loop_Count = 3
    Count = 0
    
    Interval_Time = 60
    
 # Start time of the whole process
    from datetime import datetime
 print ("Program start date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print

 # Every 10 minutes
    # schedule.every(10).minutes.do(job)
 # Every 2 hours
    # schedule.every(2).hours.do(job)
 # 10 o'clock every day
    # schedule.every().day.at("10:00").do(job)
 # every Monday
    # schedule.every().monday.do(job)
   
    schedule.every(Interval_Time).seconds.do(job)
    
 # Perform processing in an infinite loop
    while True:
    
        schedule.run_pending()
 
 #Mysterious spell ... ww
        time.sleep(Interval_Time)
        
 # Check the specified number of times
        if (Count >= Loop_Count):
            
            break
            
        else:
            
            Count += 1
       
 print (str (Count) + "times: The specified number of jobs have been completed!")
    print
    from datetime import datetime
 print ("Program end date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
       
if __name__ == "__main__":
    main()

We will check the operation in a hurry.

image.png

Create an original data table on MemSQL

Next, create a table to insert the original data on MemSQL. This is also a force, but in Python set it as follows.


 coding: utf-8

# Create a centralized sales data table on MemSQL
# Version 2.7


# Initial setting
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

import time

# You can take advantage of this with MySQL compatibility with MemSQL!
import pymysql.cursors

# Initialize an existing table
Table_Init = "DROP TABLE IF EXISTS Qiita_Test"
Table_Make = "CREATE TABLE IF NOT EXISTS Qiita_Test" 
    
# Table definition (listed in Ayer! For the time being)
DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6), PRIMARY KEY(id, ts), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Number VARCHAR(30), Payment INT, Tax INT, "
DC3 = "User VARCHAR(20), Zip VARCHAR(10), Prefecture VARCHAR(10), Address VARCHAR(60), Tel VARCHAR(15), Email VARCHAR(40)"

# Start processing
print("****************************************************")
 print ("Create a centralized sales table for validation on MemSQL")
print("****************************************************")

from datetime import datetime
 print ("Start date and time of table creation process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print

try:

 #Connect with MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
    
    with db.cursor() as cursor:
        
 #Initialize existing table
        cursor.execute(Table_Init)
        db.commit()
        
 # Create a new table
        cursor.execute(Table_Make+"("+DC0+DC1+DC2+DC3+")" )    
        db.commit() 
    
except KeyboardInterrupt:
    print("************************")
 print ('!!!!! Interrupts occur !!!!!')
    print("************************")
    print

finally:
 #Close database connection
    db.close()
    print("****************************************************************")
 print ("Create centralized sales table for validation on MemSQL finished")
    print("****************************************************************")
 print ("End date and time of table creation process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print

Forcibly generate and insert verification data using Faker

In this verification as well, we will generate "naughty" data using the standard Python Faker. We prepared as follows by brute force, giving priority to the fact that data is generated in a hurry. In the actual verification, while running this script continuously, growing the main table on MemSQL, throwing some SQL patterns to that table and processing it to get the result ... I will do it.


 coding: utf-8

# Continuously insert data into a centralized sales table on MemSQL
# Version 2.7


# Initial setting
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

import time
import pymysql.cursors
import re

# Instruction settings used in SQL
SQL_Head = "INSERT INTO Qiita_Test"

# Metadata definition used for validation
 Category_Name = ["Alcoholic beverages", "Home appliances", "Books", "DVD / CD", "Miscellaneous goods"]
 Product_Name0 = ["Sake", "Bourbon", "Beer", "Imo Shochu", "Red Wine", "White Wine", "Scotch", "Brandy", "Awamori", "Tequila"]
Product_Price0 = [1980, 2500, 490, 2000, 3000, 2500, 3500, 5000, 1980, 2000]    
 Product_Name1 = ["TV", "Washing Machine", "Radio", "Stereo", "Microwave Oven", "PC", "Battery", "Air Conditioner", "Dryer", "Vacuum Cleaner"]
Product_Price1 = [49800, 39800, 2980, 88000, 29800, 64800, 198, 64800, 35800, 24800]    
 Product_Name2 = ["Weekly magazine", "History", "Photobook", "Cartoon", "Reference book", "Fiction", "Economy", "Self-development", "Monthly magazine", "New issue"]
Product_Price2 = [280, 1500, 2500, 570, 1480, 1400, 1800, 1540, 980, 1980]    
 Product_Name3 = ["Western music", "Enka", "J-pop", "Western movies", "Idol", "Classical", "Japanese movies", "Serial drama", "Planning", "Anime"]
Product_Price3 = [1980, 2200, 2500, 3500, 2980, 1980, 3800, 2690, 1980, 2400]    
 Product_Name4 = ["Detergent", "Light bulb", "Gift", "Quasi-drug", "Pet food", "Batteries", "Stationery", "Men's goods", "Women's goods", "Seasonal goods" ]
Product_Price4 = [498, 198, 1980, 398, 980, 248, 398, 2980, 3580, 1980]


# Data column for writing (combined with table generation)
DL1 = "Category, Product, Price, Units, "
DL2 = "Card, Number, Payment, Tax, "
DL3 = "User, Zip, Prefecture, Address, Tel, Email"

# Demo processing started
print("********************************************************************")
 print ("Start automatic generation & insertion of data into centralized sales table on MemSQL")
print("********************************************************************")
print

from datetime import datetime
 print ("Start date and time of data insertion process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print

try:

 # Python data automatic generation function setting
    from faker.factory import Factory
    Faker = Factory.create
    fakegen = Faker()
    fakegen.seed(0)
    fakegen = Faker("ja_JP")
    
 #Various variable definitions
 # Total number of data to be generated-> Change here as appropriate
    ##########################################################
    Loop_Count = 10000
    ##########################################################
 # Timing adjustment flag (0: no waiting time 1: 1 second 2: random)
    ##########################################################
    Wait_Flag = 2
    ##########################################################
 # I've provided some options to make it more demo-like later.
    
 # At regular intervals (in seconds in system time)
    Sleep_Wait = 1
 #Random interval (adjusted according to the actual situation)
    Base_Count = 500000
    
 #Other variables
    Counter = 0
    Work_Count = 1

 #Connect with MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
    
    with db.cursor() as cursor:
    
 #Generation of verification data
        while Counter < Loop_Count:
            
 # Select the type of product to write at random (This time there are 5 types, so forcibly convert to 0-4)
            Category_ID = fakegen.random_digit()
            if Category_ID > 4: Category_ID = Category_ID - 5
            
 #Category name setting
            Category = Category_Name[Category_ID]

 # Select 10 types of products in each category
            Product_ID = fakegen.random_digit()
           
 # Select column information that meets the conditions
 # The number is adjusted like that
            if Category_ID == 0:
                Product = Product_Name0[Product_ID]
                Price = Product_Price0[Product_ID]
                Units = fakegen.random_digit() + 1
                
            elif Category_ID == 1:
                Product = Product_Name1[Product_ID]
                Price = Product_Price1[Product_ID]
                Units = 1
                
            elif Category_ID == 2:
                Product = Product_Name2[Product_ID]
                Price = Product_Price2[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >3: Units = 3  
                    
            elif Category_ID == 3:
                Product = Product_Name3[Product_ID]
                Price = Product_Price3[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >2: Units = 2
                    
            else:
                Product = Product_Name4[Product_ID]
                Price = Product_Price4[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >4: Units = 4
                
 #Payment information
            if str(fakegen.pybool()) == "True":
 Card = "cash"
            else:
                Card = fakegen.credit_card_provider()
    
            Number = fakegen.credit_card_number()               
 if Card == "Cash": Number = "N / A"
            
 # Calculate total payment and sales tax
            Payment = Price * Units
            Tax = Payment * 0.1

 #Generation of purchaser information
            User = fakegen.name()
            Zip = fakegen.zipcode()
            Address = fakegen.address()
            
 # Extraction of prefecture information
             pattern = u"Tokyo|Hokkaido|(?:Kyoto|Osaka)Fu|.{2,3}Prefecture"
            m = re.match(pattern , Address)
            if m:
                Prefecture = m.group()
            
            Tel = fakegen.phone_number()
            Email = fakegen.ascii_email()
       
 # Write from here to the regulation table of each database
            DV1 = Category+"','"+Product+"','"+str(Price)+"','"+str(Units)+"','"
            DV2 = Card+"','"+Number+"','"+str(Payment)+"','"+str(Tax)+"','"
            DV3 = User+"','"+Zip+"','"+Prefecture+"','"+Address+"','"+Tel+"','"+str(Email)
        
            SQL_Data = SQL_Head +"("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"
            
            cursor.execute(SQL_Data)
            db.commit()
                        
            
 #Display generated data on console (comment out if unnecessary)
            print SQL_Data
            print
            
 #Adjusting the generation interval
            if Wait_Flag == 1:
                time.sleep(Sleep_Wait)
            elif Wait_Flag == 2:
                Wait_Loop = Base_Count * fakegen.random_digit() + 1
                for i in range(Wait_Loop): Work_Count = Work_Count + i
        
 #Update loop counter
            Counter=Counter+1

except KeyboardInterrupt:
    print("************************")
 print ('!!!!! Interrupts occur !!!!!')
    print("************************")
    print

finally:
 #Close database connection
    db.close()
    print("**************************************")
 print ("Total number of generated data:" + str (Counter))
    print("**************************************")
    print
    print("************************************************************************")
 print ("Automatic generation & insertion of data into centralized sales table on MemSQL finished")
    print("************************************************************************")
 print ("End date and time of data insertion process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print


Check the operation just in case.

image.png

Also check the contents of the table.

image.png

Conduct urgent verification

Change the ** job () ** part of the script prepared first as follows. (The part that handles the return of the query is designed for future verification.)


 # Start processing
    from datetime import datetime
 print ("JOB execution date and time:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
    print
           
 #Connect with MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
    
    with db.cursor() as cursor:
        
 # Set a query to count the number of urgent data insertions
        SQL_Data = "SELECT Count(*) FROM Qiita_Test"
        
 # Send query and commit
        cursor.execute(SQL_Data)                    
        db.commit()
        
 # Get query results
        rows = cursor.fetchall()
        
 #Initialize working buffer
        Tmp_Data = []
        
 # The content of the query will increase in the future, so be prepared for that ...
        for Query_Data in rows:
                    
            for item in Query_Data.values():
                
                Tmp_Data.append(item)
                
 print ("Number of data at this point:" + str (Tmp_Data [0]))
                print
        
         
   
    db.close()

In this verification, the variable is set to be executed 3 times every 5 seconds. For the data generation interval, set a random interval aiming for a little real eyes, start it first, and then execute the modified script without a break.

image.png

It seems that it worked safely, so I will leave the preparation for this time here. Next time, I will try to increase the contents of the regular processing table and the ** job () ** created this time.

** Continue to MemSQL processing capacity verification (Application 2) ... **

Acknowledgments

This verification is carried out using the official Free version (V6) of MemSQL.

** About the Free version of MemSQL ... **

We would like to thank MemSQL for providing this valuable opportunity, and if this content differs from the content published on MemSQL's official website, MemSQL's information will take precedence. Please understand that.

Recommended Posts

MemSQL processing capacity verification (application 2)
MemSQL processing capacity verification (Application 1)
MemSQL processing capacity verification (application 3)