Cette fois, je voudrais exécuter le traitement à intervalles réguliers en continu pour une table MemSQL dans laquelle des données sont insérées en continu, extraire les données qui tombent dans la plage de temps spécifiée et générer une table. .. Puisqu'il s'agit d'une vérification dans une situation où le temps est poussé, je ne peux pas nier le sentiment que c'est un peu lâche, mais d'un autre côté, MemSQL, qui exécute plusieurs processus d'insertion par seconde, a reçu une autre demande de processus successivement. Afin de confirmer quel genre de mouvement ce sera au cas où, je vais l'essayer pour le moment.
En tant que tâche commune, ne faites pas tout ensemble! Il existe un traitement par lots dans le monde des moules, mais cette fois, il existe une solution appelée Equalum qui permet de créer un streaming en temps réel qui a été vérifié et introduit pendant longtemps sans programme, nous avons donc supposé coopérer avec lui ** " Quel MemSQL est l'environnement à réaliser "sans être appelé un voleur de transaction", "ne vous inquiétez pas du temps", "comme vous le souhaitez" et "traitement créatif des requêtes" ** pour le système de données d'origine existant. Pouvez-vous y aller? J'aimerais voir.
Tout d'abord, modifiez le script Python précédent en un processus qui s'exécute régulièrement. (1) Créez une nouvelle table en générant un nom de table avec les informations de date et d'heure du timing de démarrage (2) Extraire les informations nécessaires de la table originale insérée en continu en spécifiant la plage de temps (3) Stockez les informations renvoyées dans la nouvelle table créée en (1) Je vais mettre dans le processus. (Cette fois aussi, il peut être plein d'endroits Tsukkomi car ce sera une version Ayer en raison d'un travail de puissance, mais cette zone est tout à fait indulgente ...)
Au fait, le SQL utilisé cette fois est le suivant.
SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture
FROM Qiita_Test
WHERE ts BETWEEN 'YYYY/mm/dd HH:MM:SS.000000' AND 'YYYY/mm/dd HH:MM:SS.000000'
SORDER BY ts;
La vérification a été effectuée en supposant que la série de processus définie cette fois sera exécutée toutes les 30 secondes et les données de ce point à 30 secondes auparavant seront extraites comme conditions. (En fait, il y a un léger écart dû à d'autres influences ... (sueur)) La charge d'extraction de condition et de création de nouvelle table est appliquée en interrompant périodiquement la table insérée en continu. Si nous pouvons vérifier ce qui se passe lorsque nous l'appelons, nous procéderons au travail en disant OK pour le moment.
coding: utf-8
# Exécuter des tâches à intervalles réguliers en Python (version Power Technique)
# Version 2.7
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
# Module à importer
import schedule
import time
import pymysql.cursors
# Informations utilisées dans les instructions SQL
SQL1 = "SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture FROM Qiita_Test "
SQL3 = " ORDER BY ts"
# Définition de la colonne de la table d'instantanés
DC0 = "id BIGINT AUTO_INCREMENT, PRIMARY KEY(id), O_ts DATETIME(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Payment INT, Prefecture VARCHAR(10)"
# Définition de colonne à écrire en SQL
DL1 = "O_ts, Category, Product, "
DL2 = "Price, Units, Card, Payment, "
DL3 = "Prefecture"
# Définissez ici le processus pour qu'il s'exécute à intervalles réguliers
def job():
# Réglage de l'heure pour revenir à la synchronisation de l'instantané
Time_Int = 30
Time_Adj = 0
from datetime import datetime, date, time, timedelta
#Obtenir des informations sur la date et l'heure actuelles
now = datetime.now()
print ("Démarrer JOB:" + now.strftime ("% Y /% m /% d% H:% M:% S"))
# Générer le nom de la table pour l'instantané et générer une instruction SQL
dt = 'Qiita_' + now.strftime('%Y%m%d_%H%M%S')
Table_Make = "CREATE TABLE IF NOT EXISTS " + dt
SQL_Head = "INSERT INTO " + dt
#Fin des informations de réglage de l'heure utilisées dans SQL (Ajuster avec Time_Adj si une correction est nécessaire)
pre_sec = now - timedelta(seconds = Time_Int + Time_Adj)
from_dt = pre_sec.strftime("%Y/%m/%d %H:%M:%S")
#Informations de démarrage du réglage de l'heure utilisé dans SQL (Ajuster avec Time_Adj et + -si une correction est requise)
now_sec = now + timedelta(seconds = Time_Adj)
to_dt = now_sec.strftime("%Y/%m/%d %H:%M:%S")
#Generate instruction SQL avec plage de temps
SQL2 = "WHERE ts BETWEEN '" + from_dt + ".000000' AND '" + to_dt + ".000000'"
SQL_Def = SQL1 + SQL2 + SQL3
#Connectez-vous avec MemSQL
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='qiita',
password='adminqiita',
db='Test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
cursor.arraysize = 1000
#Créer une nouvelle table pour les instantanés
cursor.execute(Table_Make +"("+DC0+DC1+DC2+")" )
db.commit()
#Initialiser le tampon de travail
Tmp_Data = []
# Envoyer SQL pour requête et validation
cursor.execute(SQL_Def)
db.commit()
# Obtenir les résultats de la requête
rows = cursor.fetchall()
# Obtenir les résultats de la requête
for Query_Data in rows:
for item in Query_Data.values():
Tmp_Data.append(item)
#Reflect dans chaque colonne
Category = str(Tmp_Data[0])
Product = str(Tmp_Data[1])
Price = str(Tmp_Data[2])
O_ts = str(Tmp_Data[3])
Units = str(Tmp_Data[4])
Prefecture = str(Tmp_Data[5])
Payment = str(Tmp_Data[6])
Card = str(Tmp_Data[7])
#Créer SQL et stocker dans une table d'instantanés
DV1 = O_ts + "','" + Category + "','" + Product + "','"
DV2 = Price + "','" + Units + "','" + Card + "','" + Payment + "','"
DV3 = Prefecture
SQL_Data = SQL_Head + "("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"
# Insérer dans le tableau pour un instantané
cursor.execute(SQL_Data)
db.commit()
Tmp_Data = []
#Disconnect de la connexion à la base de données
db.close()
print ("Fin du JOB:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
print ("+++++++++++++++++++++++++++++++")
print
# Partie principale d'ici
def main():
# Paramétrage des variables à utiliser
Loop_Count = 3
Count = 0
Interval_Time = 30
# Heure de début de l'ensemble du processus
from datetime import datetime
print ("Date et heure de début du programme:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
# Toutes les 10 minutes
# schedule.every(10).minutes.do(job)
# Toutes les 2 heures
# schedule.every(2).hours.do(job)
# 10 heures tous les jours
# schedule.every().day.at("10:00").do(job)
# tous les lundis
# schedule.every().monday.do(job)
schedule.every(Interval_Time).seconds.do(job)
# Processus en boucle infinie
while True:
schedule.run_pending()
#Sortiment mystérieux ... ww
time.sleep(Interval_Time)
# Vérifiez le nombre de fois spécifié
if (Count >= Loop_Count):
break
else:
Count += 1
print
print (str (Count) + "times: Le nombre spécifié de travaux est terminé!")
print
from datetime import datetime
print ("Date et heure de fin du programme:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
if __name__ == "__main__":
main()
Le résultat de l'exécution de ce script est le suivant.
Date et heure de début du programme: 2020/10/12 14:31:35
Début du JOB: 12/10/2020 14:32:05
Fin du JOB: 12/10/2020 14:32:06
+++++++++++++++++++++++++++++++
Début du JOB: 12/10/2020 14:32:36
Fin du JOB: 12/10/2020 14:32:37
+++++++++++++++++++++++++++++++
Début du JOB: 12/10/2020 14:33:07
Fin du JOB: 12/10/2020 14:33:09
+++++++++++++++++++++++++++++++
3 fois: le nombre de travaux spécifié a été terminé!
Date et heure de fin du programme: 2020/10/12 14:33:39
Si cela fonctionne comme prévu, vous devriez avoir trois tables avec des informations d'heure de début sur MemSQL. Tout d'abord, je vais le vérifier avec le familier ** DBeaver ** à chaque fois.
Il semble qu'il a été complété avec succès avec la table ** Qiita_Test ** de la table d'origine. Au cas où, vérifions le début et la fin de chaque tableau.
En raison de l'ordre dans lequel le script d'insertion et ce script ont été lancés, l'heure initiale des données est un peu tardive, mais il semble que l'extraction de la montée soit bien traitée pour le moment.
Ensuite, comparons les parties finales.
La dernière donnée, TIMESTAMP (6), est 14: 32: 04: 804694, vous pouvez donc voir qu'elle correspond comme prévu.
Vérifiez également les deux autres.
J'ai manqué les données entre les intervalles de traitement, mais la plage de temps définie est couverte. (Je dois travailler sur la logique ... (suer))
Il est en toute sécurité dans la plage.
Le dernier tableau est ...
Comme auparavant, nous ne pouvions pas collecter 2 à 3 éléments de données correspondant à la limite du calcul du temps dans le processus de traitement, mais il semble qu'ils se situent dans la plage de temps définie dans l'instruction SQL.
Si je peux contrôler le temps avec un peu plus de soin, je pense que la précision de l'utilisation des données dans un cycle plus précis et plus court s'améliorera, mais avec l'aide d'une alimentation urgente en mémoire, j'ai introduit un gros processus (par exemple, je l'ai déjà présenté). Même si MemSQL est exécuté (comme le remplacement de la partie d'insertion continue effectuée dans cette vérification à l'aide d'Equalum), il est possible d'extraire librement des données et d'effectuer le travail du processus suivant avec des performances de l'ordre de la milliseconde. J'espère que vous comprenez que c'est extrêmement réaliste.
Dans un cycle de traitement court, il y a fondamentalement peu de données à traiter, il est donc possible d'effectuer un traitement d'extraction / agrégation de données de manière simulée tout en supposant librement une situation extrêmement proche du présent. Don tous ensemble dans un lot de minuit! Comme ce n'est pas le cas, toutes les données qui peuvent être traitées jusqu'à ce point à tout moment sont extraites et prétraitées de la source de données qui traverse le mur du silo en amont en utilisant Equalum, et étendues sur l'espace mémoire de MemSQL ( À ce stade, les informations du silo sont réparties à plat dans l'espace mémoire géré par la même base de données), et il devient aisé d'utiliser les données de manière transversale et transparente pour le silo.
Bien sûr, même lorsque vous définissez une requête d'extraction (y compris cette spécification de temps) qui définit certaines conditions pour une table contenant 100000 lignes d'informations développées du côté d'origine sur MemSQL, Étant donné que les performances de l'ordre de la milliseconde peuvent être maintenues de manière stable, si l'espace d'extension de la mémoire est censé être important, mettez en cluster les serveurs IA nécessaires pour fournir un calcul complet des données en mémoire. Il est possible de réaliser.
La table extraite par traitement par lots cette fois-ci peut être lue comme la table Excel introduite plus tôt, elle peut donc être utilisée comme BI en utilisant les fonctions d'Excel, et la compatibilité MySQL élevée de MemSQL est obtenue. Il sera possible de l'utiliser immédiatement comme matériau source pour d'autres solutions BI / IA et autres qui ont été utilisées. ** Utilisons MemSQL Vol.6: Derailment ** peut être utile.
La prochaine fois, j'aimerais essayer quelques liens externes en utilisant le tableau extrait cette fois. Ce travail n'est pas directement lié à MemSQL, mais je creuse un peu plus dans la "base de données opérationnelle" (je me demande si ce n'est pas relationnel ...) que MemSQL met en message, et les transactions modernes récentes. Vérifier la collaboration avec le calcul des données système,
** Système de données existant >> Equalum >> MemSQL >> BI / AI / robot etc. ** ** Relationnel <<<<<<<<<<<< >>>>>>>> Opérationnel **
Je voudrais jeter un coup d'œil dans le monde de.
Cette vérification est effectuée à l'aide de la version gratuite officielle (V6) de MemSQL.
** À propos de la version gratuite de MemSQL ... **
Nous tenons à remercier MemSQL d'avoir fourni cette précieuse opportunité, et si ce contenu diffère du contenu publié sur le site officiel de MemSQL, les informations de MemSQL prévaudront. Veuillez comprendre cela.