It is a verification of MemSQL that has been carried out several times in the past, but from this time it is a little (quite ...) forcible, but I would like to verify it with a feeling close to the actual processing. MemSQL itself has very high in-memory compatibility with MySQL, but where do you take advantage of its high transactional performance? In that part, there are cases where we receive comments such as that such fast processing power is not necessary. Therefore, this time we are verifying MemSQL for a long time ** As a target side data source of Equalum ** that anyone can create real-time streaming processing, AI and BI that utilize the data prepared there, and AI in the future I would like to explore the possibility of a knowledge-based position that enables high-speed simulation for various robot-based systems that will lose their boundaries.
In a situation where sales information comes in real time on MemSQL ... (1) Run a batch processing SQL process at regular time intervals to retrieve necessary information. (2) Finally, the extracted information is automatically generated as a new table, and it is linked with the external processing (assuming R) that utilizes that table. I will do it.
** The point is that the data accumulated on MemSQL is controlled by Equalum ** (1) No matter how much SQL is used, the ** transaction thief ** event against the original data provider does not occur. (2) Since the original data is ** maintained and managed as usual ** on the existing database that has withered and is operating stably, the mechanism on the MemSQL side can be constructed with performance-oriented data computing specifications. (3) Data extracted from different databases on the upstream side can be made transparent SQL targets on MemSQL, so there is no need to forcibly integrate the upstream data source layer **. There will be advantages such as ...! It becomes an urban legendary (?) Hypothesis.
First, create a mechanism to automatically perform continuous processing in Python. This time, ** "Now nostalgic (?) Version 2.7" ** hurry up ** "Operation priority" ** will be created (version 3 will be ported if there is time later ... (sweat) )
coding: utf-8
# Execute tasks in Python at regular intervals (power version)
# Version 2.7
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
# Module to import
import schedule
import time
# Set the process to run at regular intervals here
def job():
print("********************************")
print ("execute the specified job")
print("********************************")
from datetime import datetime
print ("JOB start date and time:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
JOB_SQL = "SELECT DISTINCT * FROM Test_Table WHERE DATE_SUB(NOW(),INTERVAL 60 SECOND) AND Category = 'Test_Category' ORDER BY ts"
print JOB_SQL
print("********************************")
print ("The specified job has been executed")
print("********************************")
print
# Main part from here
def main():
#Set variables to use
Loop_Count = 3
Count = 0
Interval_Time = 60
# Start time of the whole process
from datetime import datetime
print ("Program start date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
# Every 10 minutes
# schedule.every(10).minutes.do(job)
# Every 2 hours
# schedule.every(2).hours.do(job)
# 10 o'clock every day
# schedule.every().day.at("10:00").do(job)
# every Monday
# schedule.every().monday.do(job)
schedule.every(Interval_Time).seconds.do(job)
# Perform processing in an infinite loop
while True:
schedule.run_pending()
#Mysterious spell ... ww
time.sleep(Interval_Time)
# Check the specified number of times
if (Count >= Loop_Count):
break
else:
Count += 1
print (str (Count) + "times: The specified number of jobs have been completed!")
print
from datetime import datetime
print ("Program end date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
if __name__ == "__main__":
main()
We will check the operation in a hurry.
Next, create a table to insert the original data on MemSQL. This is also a force, but in Python set it as follows.
coding: utf-8
# Create a centralized sales data table on MemSQL
# Version 2.7
# Initial setting
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
import time
# You can take advantage of this with MySQL compatibility with MemSQL!
import pymysql.cursors
# Initialize an existing table
Table_Init = "DROP TABLE IF EXISTS Qiita_Test"
Table_Make = "CREATE TABLE IF NOT EXISTS Qiita_Test"
# Table definition (listed in Ayer! For the time being)
DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6), PRIMARY KEY(id, ts), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Number VARCHAR(30), Payment INT, Tax INT, "
DC3 = "User VARCHAR(20), Zip VARCHAR(10), Prefecture VARCHAR(10), Address VARCHAR(60), Tel VARCHAR(15), Email VARCHAR(40)"
# Start processing
print("****************************************************")
print ("Create a centralized sales table for validation on MemSQL")
print("****************************************************")
from datetime import datetime
print ("Start date and time of table creation process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
try:
#Connect with MemSQL
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='qiita',
password='adminqiita',
db='Test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
#Initialize existing table
cursor.execute(Table_Init)
db.commit()
# Create a new table
cursor.execute(Table_Make+"("+DC0+DC1+DC2+DC3+")" )
db.commit()
except KeyboardInterrupt:
print("************************")
print ('!!!!! Interrupts occur !!!!!')
print("************************")
print
finally:
#Close database connection
db.close()
print("****************************************************************")
print ("Create centralized sales table for validation on MemSQL finished")
print("****************************************************************")
print ("End date and time of table creation process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
In this verification as well, we will generate "naughty" data using the standard Python Faker. We prepared as follows by brute force, giving priority to the fact that data is generated in a hurry. In the actual verification, while running this script continuously, growing the main table on MemSQL, throwing some SQL patterns to that table and processing it to get the result ... I will do it.
coding: utf-8
# Continuously insert data into a centralized sales table on MemSQL
# Version 2.7
# Initial setting
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
import time
import pymysql.cursors
import re
# Instruction settings used in SQL
SQL_Head = "INSERT INTO Qiita_Test"
# Metadata definition used for validation
Category_Name = ["Alcoholic beverages", "Home appliances", "Books", "DVD / CD", "Miscellaneous goods"]
Product_Name0 = ["Sake", "Bourbon", "Beer", "Imo Shochu", "Red Wine", "White Wine", "Scotch", "Brandy", "Awamori", "Tequila"]
Product_Price0 = [1980, 2500, 490, 2000, 3000, 2500, 3500, 5000, 1980, 2000]
Product_Name1 = ["TV", "Washing Machine", "Radio", "Stereo", "Microwave Oven", "PC", "Battery", "Air Conditioner", "Dryer", "Vacuum Cleaner"]
Product_Price1 = [49800, 39800, 2980, 88000, 29800, 64800, 198, 64800, 35800, 24800]
Product_Name2 = ["Weekly magazine", "History", "Photobook", "Cartoon", "Reference book", "Fiction", "Economy", "Self-development", "Monthly magazine", "New issue"]
Product_Price2 = [280, 1500, 2500, 570, 1480, 1400, 1800, 1540, 980, 1980]
Product_Name3 = ["Western music", "Enka", "J-pop", "Western movies", "Idol", "Classical", "Japanese movies", "Serial drama", "Planning", "Anime"]
Product_Price3 = [1980, 2200, 2500, 3500, 2980, 1980, 3800, 2690, 1980, 2400]
Product_Name4 = ["Detergent", "Light bulb", "Gift", "Quasi-drug", "Pet food", "Batteries", "Stationery", "Men's goods", "Women's goods", "Seasonal goods" ]
Product_Price4 = [498, 198, 1980, 398, 980, 248, 398, 2980, 3580, 1980]
# Data column for writing (combined with table generation)
DL1 = "Category, Product, Price, Units, "
DL2 = "Card, Number, Payment, Tax, "
DL3 = "User, Zip, Prefecture, Address, Tel, Email"
# Demo processing started
print("********************************************************************")
print ("Start automatic generation & insertion of data into centralized sales table on MemSQL")
print("********************************************************************")
print
from datetime import datetime
print ("Start date and time of data insertion process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
try:
# Python data automatic generation function setting
from faker.factory import Factory
Faker = Factory.create
fakegen = Faker()
fakegen.seed(0)
fakegen = Faker("ja_JP")
#Various variable definitions
# Total number of data to be generated-> Change here as appropriate
##########################################################
Loop_Count = 10000
##########################################################
# Timing adjustment flag (0: no waiting time 1: 1 second 2: random)
##########################################################
Wait_Flag = 2
##########################################################
# I've provided some options to make it more demo-like later.
# At regular intervals (in seconds in system time)
Sleep_Wait = 1
#Random interval (adjusted according to the actual situation)
Base_Count = 500000
#Other variables
Counter = 0
Work_Count = 1
#Connect with MemSQL
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='qiita',
password='adminqiita',
db='Test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
#Generation of verification data
while Counter < Loop_Count:
# Select the type of product to write at random (This time there are 5 types, so forcibly convert to 0-4)
Category_ID = fakegen.random_digit()
if Category_ID > 4: Category_ID = Category_ID - 5
#Category name setting
Category = Category_Name[Category_ID]
# Select 10 types of products in each category
Product_ID = fakegen.random_digit()
# Select column information that meets the conditions
# The number is adjusted like that
if Category_ID == 0:
Product = Product_Name0[Product_ID]
Price = Product_Price0[Product_ID]
Units = fakegen.random_digit() + 1
elif Category_ID == 1:
Product = Product_Name1[Product_ID]
Price = Product_Price1[Product_ID]
Units = 1
elif Category_ID == 2:
Product = Product_Name2[Product_ID]
Price = Product_Price2[Product_ID]
Units = fakegen.random_digit() + 1
if Units >3: Units = 3
elif Category_ID == 3:
Product = Product_Name3[Product_ID]
Price = Product_Price3[Product_ID]
Units = fakegen.random_digit() + 1
if Units >2: Units = 2
else:
Product = Product_Name4[Product_ID]
Price = Product_Price4[Product_ID]
Units = fakegen.random_digit() + 1
if Units >4: Units = 4
#Payment information
if str(fakegen.pybool()) == "True":
Card = "cash"
else:
Card = fakegen.credit_card_provider()
Number = fakegen.credit_card_number()
if Card == "Cash": Number = "N / A"
# Calculate total payment and sales tax
Payment = Price * Units
Tax = Payment * 0.1
#Generation of purchaser information
User = fakegen.name()
Zip = fakegen.zipcode()
Address = fakegen.address()
# Extraction of prefecture information
pattern = u"Tokyo|Hokkaido|(?:Kyoto|Osaka)Fu|.{2,3}Prefecture"
m = re.match(pattern , Address)
if m:
Prefecture = m.group()
Tel = fakegen.phone_number()
Email = fakegen.ascii_email()
# Write from here to the regulation table of each database
DV1 = Category+"','"+Product+"','"+str(Price)+"','"+str(Units)+"','"
DV2 = Card+"','"+Number+"','"+str(Payment)+"','"+str(Tax)+"','"
DV3 = User+"','"+Zip+"','"+Prefecture+"','"+Address+"','"+Tel+"','"+str(Email)
SQL_Data = SQL_Head +"("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"
cursor.execute(SQL_Data)
db.commit()
#Display generated data on console (comment out if unnecessary)
print SQL_Data
print
#Adjusting the generation interval
if Wait_Flag == 1:
time.sleep(Sleep_Wait)
elif Wait_Flag == 2:
Wait_Loop = Base_Count * fakegen.random_digit() + 1
for i in range(Wait_Loop): Work_Count = Work_Count + i
#Update loop counter
Counter=Counter+1
except KeyboardInterrupt:
print("************************")
print ('!!!!! Interrupts occur !!!!!')
print("************************")
print
finally:
#Close database connection
db.close()
print("**************************************")
print ("Total number of generated data:" + str (Counter))
print("**************************************")
print
print("************************************************************************")
print ("Automatic generation & insertion of data into centralized sales table on MemSQL finished")
print("************************************************************************")
print ("End date and time of data insertion process:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
Check the operation just in case.
Also check the contents of the table.
Change the ** job () ** part of the script prepared first as follows. (The part that handles the return of the query is designed for future verification.)
# Start processing
from datetime import datetime
print ("JOB execution date and time:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
print
#Connect with MemSQL
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='qiita',
password='adminqiita',
db='Test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
# Set a query to count the number of urgent data insertions
SQL_Data = "SELECT Count(*) FROM Qiita_Test"
# Send query and commit
cursor.execute(SQL_Data)
db.commit()
# Get query results
rows = cursor.fetchall()
#Initialize working buffer
Tmp_Data = []
# The content of the query will increase in the future, so be prepared for that ...
for Query_Data in rows:
for item in Query_Data.values():
Tmp_Data.append(item)
print ("Number of data at this point:" + str (Tmp_Data [0]))
print
db.close()
In this verification, the variable is set to be executed 3 times every 5 seconds. For the data generation interval, set a random interval aiming for a little real eyes, start it first, and then execute the modified script without a break.
It seems that it worked safely, so I will leave the preparation for this time here. Next time, I will try to increase the contents of the regular processing table and the ** job () ** created this time.
** Continue to MemSQL processing capacity verification (Application 2) ... **
This verification is carried out using the official Free version (V6) of MemSQL.
** About the Free version of MemSQL ... **
We would like to thank MemSQL for providing this valuable opportunity, and if this content differs from the content published on MemSQL's official website, MemSQL's information will take precedence. Please understand that.