This time, I would like to perform continuous processing at regular intervals on a MemSQL table in which data is inserted continuously, extract data that falls within the specified time range, and generate a table. .. Since it is a verification in a situation where time is pushed, I can not deny the feeling that it is a little loose, but on the other hand, MemSQL, which is executing several inserts per second, received another processing request in succession. In order to confirm what kind of movement it will be in case, I will try it for the time being.
As a common task, don't do it all together! There is batch processing in the world of molds, but this time there is a solution called Equalum that can build real-time streaming that we have been verifying and introducing without a program, so we assumed cooperation with it ** " Which MemSQL is the environment to realize "without being called a transaction thief", "don't worry about time", "as you wish", and "creative query processing" ** for the existing original data system. Can you go about? I would like to see.
First, modify the old Python script into a process that runs regularly. (1) Create a new table by generating a table name with the date and time information of the startup timing (2) Extract necessary information from the continuously inserted original table by specifying the time range (3) Store the returned information in the new table created in (1) I will put in the process. (This time too, it may be full of Tsukkomi places because it will be an Ayer version due to power work, but that area is flatly forgiving ...)
By the way, the SQL used this time is as follows.
SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture
FROM Qiita_Test
WHERE ts BETWEEN 'YYYY/mm/dd HH:MM:SS.000000' AND 'YYYY/mm/dd HH:MM:SS.000000'
SORDER BY ts;
The verification was carried out on the assumption that the series of processes set this time will be executed every 30 seconds and the data from that point to 30 seconds before will be extracted as conditions. (Actually, there is a slight deviation due to other influences ... (sweat)) The load of condition extraction & new table creation is applied by periodically interrupting the table that is continuously inserted. If we can verify what happens when we call it, we will proceed with the work by saying OK for the time being.
coding: utf-8
# Execute tasks in Python at regular intervals (power version)
# Version 2.7
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
# Module to import
import schedule
import time
import pymysql.cursors
# Information used in SQL statements
SQL1 = "SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture FROM Qiita_Test "
SQL3 = " ORDER BY ts"
# Snapshot table column definition
DC0 = "id BIGINT AUTO_INCREMENT, PRIMARY KEY(id), O_ts DATETIME(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Payment INT, Prefecture VARCHAR(10)"
# Column definition to write in SQL
DL1 = "O_ts, Category, Product, "
DL2 = "Price, Units, Card, Payment, "
DL3 = "Prefecture"
# Set the process to run at regular intervals here
def job():
# Time setting to go back from the snapshot timing
Time_Int = 30
Time_Adj = 0
from datetime import datetime, date, time, timedelta
#Get current date and time information
now = datetime.now()
print ("Start JOB:" + now.strftime ("% Y /% m /% d% H:% M:% S"))
# Generate a table name for snapshot and generate SQL statement
dt = 'Qiita_' + now.strftime('%Y%m%d_%H%M%S')
Table_Make = "CREATE TABLE IF NOT EXISTS " + dt
SQL_Head = "INSERT INTO " + dt
#End information of time setting used in SQL (Adjust with Time_Adj if correction is required)
pre_sec = now - timedelta(seconds = Time_Int + Time_Adj)
from_dt = pre_sec.strftime("%Y/%m/%d %H:%M:%S")
#Start information of time setting used in SQL (Adjust with Time_Adj and +-if correction is required)
now_sec = now + timedelta(seconds = Time_Adj)
to_dt = now_sec.strftime("%Y/%m/%d %H:%M:%S")
# Generate SQL statement with time range
SQL2 = "WHERE ts BETWEEN '" + from_dt + ".000000' AND '" + to_dt + ".000000'"
SQL_Def = SQL1 + SQL2 + SQL3
#Connect with MemSQL
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='qiita',
password='adminqiita',
db='Test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
cursor.arraysize = 1000
#Create a new table for snapshots
cursor.execute(Table_Make +"("+DC0+DC1+DC2+")" )
db.commit()
#Initialize working buffer
Tmp_Data = []
# Send SQL for query and commit
cursor.execute(SQL_Def)
db.commit()
# Get query results
rows = cursor.fetchall()
# Get query results
for Query_Data in rows:
for item in Query_Data.values():
Tmp_Data.append(item)
#Reflect in each column
Category = str(Tmp_Data[0])
Product = str(Tmp_Data[1])
Price = str(Tmp_Data[2])
O_ts = str(Tmp_Data[3])
Units = str(Tmp_Data[4])
Prefecture = str(Tmp_Data[5])
Payment = str(Tmp_Data[6])
Card = str(Tmp_Data[7])
#Create SQL and store in snapshot table
DV1 = O_ts + "','" + Category + "','" + Product + "','"
DV2 = Price + "','" + Units + "','" + Card + "','" + Payment + "','"
DV3 = Prefecture
SQL_Data = SQL_Head + "("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"
# Insert into table for snapshot
cursor.execute(SQL_Data)
db.commit()
Tmp_Data = []
#Disconnect database connection
db.close()
print ("End of JOB:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
print ("+++++++++++++++++++++++++++++++")
print
# Main part from here
def main():
#Set variables to use
Loop_Count = 3
Count = 0
Interval_Time = 30
# Start time of the whole process
from datetime import datetime
print ("Program start date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
# Every 10 minutes
# schedule.every(10).minutes.do(job)
# Every 2 hours
# schedule.every(2).hours.do(job)
# 10 o'clock every day
# schedule.every().day.at("10:00").do(job)
# every Monday
# schedule.every().monday.do(job)
schedule.every(Interval_Time).seconds.do(job)
# Perform processing in an infinite loop
while True:
schedule.run_pending()
#Mysterious spell ... ww
time.sleep(Interval_Time)
# Check the specified number of times
if (Count >= Loop_Count):
break
else:
Count += 1
print
print (str (Count) + "times: The specified number of jobs have been completed!")
print
from datetime import datetime
print ("Program end date and time:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
if __name__ == "__main__":
main()
The result of running this script is as follows.
Program start date and time: 2020/10/12 14:31:35
Start of JOB: 2020/10/12 14:32:05
End of JOB: 2020/10/12 14:32:06
+++++++++++++++++++++++++++++++
Start of JOB: 2020/10/12 14:32:36
End of JOB: 2020/10/12 14:32:37
+++++++++++++++++++++++++++++++
Start of JOB: 2020/10/12 14:33:07
End of JOB: 2020/10/12 14:33:09
+++++++++++++++++++++++++++++++
3 times: The specified number of jobs have been completed!
Program end date and time: 2020/10/12 14:33:39
If it works as expected, you should have 3 tables with start time information on MemSQL. First of all, I will check it with the familiar ** DBeaver ** every time.
It seems that it has been successfully completed together with the ** Qiita_Test ** table of the original table. Just in case, let's check the beginning and end of each table.
Due to the order in which the insertion script and this script were started, the initial data time is a little late, but for the time being, the extraction of the rise seems to be processed well.
Next, let's compare the end parts.
The last data, TIMESTAMP (6), is 14:32:04:804694, so you can see that it fits as expected.
Check the other two as well.
I missed the data between the processing intervals, but the defined time range is covered. (I have to work out the logic ... (sweat))
It is safely within the range.
The last table is ...
As before, we couldn't collect 2-3 pieces of data that fit the border of time calculation in the processing process, but it seems that they are within the time range defined in the SQL statement.
If I can control the time with a little more care, I think that the accuracy of data utilization in a more precise and short cycle will improve, but with the help of in-memory power in a hurry, I introduced a large process (for example, I introduced it before). Even if MemSQL is executed (such as replacing the continuous insertion part performed in this verification using Equalum), it is possible to freely extract data and perform the work of the next process with performance in the millisecond range. I hope you understand that it is extremely realistic.
In a short processing cycle, there is basically little data to handle, so it is possible to perform data extraction / aggregation processing in a simulation manner while freely assuming a situation that is extremely close to the present. Don all together in a midnight batch! Since it is not, all the data that can be handled up to that point is extracted and preprocessed from the data source that crosses the wall of the upstream silo using Equalum, and expanded in the memory space of MemSQL ( At this point, the information in the silo is spread flat in the memory space managed by the same DB), and it becomes easy to utilize the data in a silo-transparent manner.
Of course, even when you set an extraction query (including this time specification) that sets some conditions for a table that contains 100,000 lines of information expanded on the original side of this time on MemSQL, Since the performance in the millisecond range can be maintained stably, if the space for memory expansion is expected to be large, cluster the necessary IA servers to provide full in-memory data computing. It is possible to carry out.
The table extracted by batch processing this time can be read as an Excel table introduced earlier, so it can be used as a BI by utilizing the functions on Excel, and the high MySQL compatibility of MemSQL is achieved. It will be possible to immediately use it as a source material for other BI / AI and other solutions that have been utilized. ** Let's use MemSQL Vol.6: Derailment ** may be helpful.
Next time, I would like to try some external linkages using the table extracted this time. This work is not directly related to MemSQL, but I dig a little deeper into the "operational database" (I wonder if it is not relational ...) that MemSQL puts out as a message, and recent modern transactions Verify collaboration with system data computing,
** Existing data system >> Equalum >> MemSQL >> BI / AI / robot etc .. ** ** Relational <<<<<<<<<<<< >>>>>>>> Operational **
I would like to take a peek into the world of.
This verification is carried out using the official Free version (V6) of MemSQL.
** About the Free version of MemSQL ... **
We would like to thank MemSQL for providing this valuable opportunity, and if this content differs from the content published on MemSQL's official website, MemSQL's information will take precedence. Please understand that.