[PYTHON] Vérification de la capacité de traitement MemSQL (Application 1)

Cette fois, nous vérifierons la puissance de traitement SQL de MemSQL

C'est une vérification de MemSQL qui a été effectuée plusieurs fois dans le passé, mais à partir de ce moment elle est un peu (assez ...) forcée, mais j'aimerais la vérifier avec un sentiment proche du traitement réel. MemSQL lui-même a une très haute compatibilité en mémoire avec MySQL, mais où tirer parti de ses performances de transaction élevées? Dans cette partie, il y a des cas où nous recevons des commentaires tels que nous n'avons pas besoin de cette puissance de traitement rapide. Par conséquent, cette fois, nous vérifions MemSQL depuis longtemps ** En tant que source de données côté cible d'Equalum **, tout le monde peut créer un traitement de streaming en temps réel, une IA et BI qui utilisent les données préparées là-bas, et l'IA à l'avenir Je voudrais explorer la possibilité d'une position basée sur la connaissance qui permet une simulation à grande vitesse pour divers systèmes robotisés qui perdront la frontière entre eux.

スクリーンショット 2020-10-07 9.12.50.png

Scénario supposé

Dans une situation où les informations de vente arrivent en temps réel sur MemSQL ... (1) Exécutez un processus SQL de traitement par lots à intervalles réguliers pour extraire les informations nécessaires. (2) Enfin, les informations extraites sont automatiquement générées sous la forme d'une nouvelle table et sont liées à un traitement externe (en supposant R) qui utilise cette table. Je le ferai.

** Le fait est que les données accumulées sur MemSQL sont contrôlées par Equalum ** (1) Peu importe combien vous jouez avec SQL, l'événement ** de voleur de transaction ** contre le fournisseur de données d'origine ne se produit pas. (2) Étant donné que les données d'origine sont ** maintenues et gérées comme d'habitude ** sur la base de données existante qui a flétri et fonctionne de manière stable, le mécanisme du côté MemSQL peut être construit avec des spécifications de calcul de données axées sur les performances. (3) Les données extraites de différentes bases de données en amont peuvent être ciblées pour un SQL transparent sur MemSQL, il n'est donc pas nécessaire d'intégrer de force la couche de source de données en amont **. Il y aura des avantages tels que ...! Cela devient une hypothèse légendaire (?) De ville.

スクリーンショット 2020-10-07 9.13.28.png

Préparation préalable

Tout d'abord, créez un mécanisme qui effectue automatiquement un traitement continu en Python. Cette fois, ** "Now nostalgic (?) Version 2.7" ** est créé de toute urgence ** "Operation priority" ** (Pour la version 3, je le porterai s'il reste du temps plus tard ... (suer) )

 coding: utf-8

# Exécuter des tâches à intervalles réguliers en Python (version Power Technique)
# Version 2.7


import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

# Module à importer
import schedule
import time


# Définissez ici le processus pour qu'il s'exécute à intervalles réguliers


def job():
       
    print("********************************")
 print ("exécuter le JOB spécifié")
    print("********************************")
    
    from datetime import datetime
 print ("Date et heure de début du JOB:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
                           
    JOB_SQL = "SELECT DISTINCT * FROM Test_Table WHERE DATE_SUB(NOW(),INTERVAL 60 SECOND) AND Category = 'Test_Category' ORDER BY ts"
    print JOB_SQL
    
    print("********************************")
 print ("Le JOB spécifié a été exécuté")
    print("********************************")
    print


# Partie principale d'ici


def main():
    
 # Paramétrage des variables à utiliser
    Loop_Count = 3
    Count = 0
    
    Interval_Time = 60
    
 # Heure de début de l'ensemble du processus
    from datetime import datetime
 print ("Date et heure de début du programme:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print

 # Toutes les 10 minutes
    # schedule.every(10).minutes.do(job)
 # Toutes les 2 heures
    # schedule.every(2).hours.do(job)
 # 10 heures tous les jours
    # schedule.every().day.at("10:00").do(job)
 # tous les lundis
    # schedule.every().monday.do(job)
   
    schedule.every(Interval_Time).seconds.do(job)
    
 # Processus en boucle infinie
    while True:
    
        schedule.run_pending()
 
 #Sortiment mystérieux ... ww
        time.sleep(Interval_Time)
        
 # Vérifiez le nombre de fois spécifié
        if (Count >= Loop_Count):
            
            break
            
        else:
            
            Count += 1
       
 print (str (Count) + "times: Le nombre spécifié de travaux est terminé!")
    print
    from datetime import datetime
 print ("Date et heure de fin du programme:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
       
if __name__ == "__main__":
    main()

Nous vérifierons l'opération à la hâte.

image.png

Créer une table de données originale sur MemSQL

Ensuite, créez une table pour insérer les données d'origine sur MemSQL. C'est aussi une force, mais en Python, définissez-la comme suit.


 coding: utf-8

# Créer une table de données de ventes centralisée sur MemSQL
# Version 2.7


# Réglage initial
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

import time

# Vous pouvez en profiter avec la compatibilité MySQL de MemSQL!
import pymysql.cursors

# Initialiser une table existante
Table_Init = "DROP TABLE IF EXISTS Qiita_Test"
Table_Make = "CREATE TABLE IF NOT EXISTS Qiita_Test" 
    
# Définition de la table (répertoriée dans Ayer! Pour le moment)
DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6), PRIMARY KEY(id, ts), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Number VARCHAR(30), Payment INT, Tax INT, "
DC3 = "User VARCHAR(20), Zip VARCHAR(10), Prefecture VARCHAR(10), Address VARCHAR(60), Tel VARCHAR(15), Email VARCHAR(40)"

# Commencer le traitement
print("****************************************************")
 print ("Créer une table de ventes centralisée pour validation sur MemSQL")
print("****************************************************")

from datetime import datetime
 print ("Date et heure de début du processus de création de table:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print

try:

 #Connectez-vous avec MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
    
    with db.cursor() as cursor:
        
 #Initialiser la table existante
        cursor.execute(Table_Init)
        db.commit()
        
 # Créer une nouvelle table
        cursor.execute(Table_Make+"("+DC0+DC1+DC2+DC3+")" )    
        db.commit() 
    
except KeyboardInterrupt:
    print("************************")
 print ('!!!!! Une interruption s'est produite !!!!!')
    print("************************")
    print

finally:
 #Close connexion à la base de données
    db.close()
    print("****************************************************************")
 print ("Créer une table de ventes centralisée pour validation sur MemSQL terminé")
    print("****************************************************************")
 print ("Date et heure de fin du processus de création de table:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print

Générer et insérer de force des données de vérification à l'aide de Faker

Cette vérification utilise également le Python Faker classique pour générer des données "coquines". Nous nous sommes préparés comme suit par force brute, en donnant la priorité au fait que les données sont générées à la hâte. Dans la vérification réelle, tout en exécutant ce script en continu, tout en développant la table principale sur MemSQL, lancez des modèles SQL sur cette table et traitez-la pour obtenir le résultat ... Je le ferai.


 coding: utf-8

# Insérez en continu des données dans une table de vente centralisée sur MemSQL
# Version 2.7


# Réglage initial
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout

import time
import pymysql.cursors
import re

# Paramètres d'instruction utilisés dans SQL
SQL_Head = "INSERT INTO Qiita_Test"

# Définition des métadonnées utilisée pour la validation
 Category_Name = ["Liqueur", "Appareils ménagers", "Livres", "DVD / CD", "Produits divers"]
 Product_Name0 = ["Saké japonais", "Bourbon", "Bière", "Imo shochu", "Vin rouge", "Vin blanc", "Scotch", "Brandy", "Awamori", "Tequila"]
Product_Price0 = [1980, 2500, 490, 2000, 3000, 2500, 3500, 5000, 1980, 2000]    
 Product_Name1 = ["TV", "Machine à laver", "Radio", "Stéréo", "Micro-ondes", "PC", "Batterie", "Climatiseur", "Sèche-linge", "Aspirateur"]
Product_Price1 = [49800, 39800, 2980, 88000, 29800, 64800, 198, 64800, 35800, 24800]    
 Product_Name2 = ["Magazine hebdomadaire", "Histoire", "Collection de photos", "Dessin animé", "Livre de référence", "Fiction", "Économie", "Développement personnel", "Magazine mensuel", "Nouveau numéro"]
Product_Price2 = [280, 1500, 2500, 570, 1480, 1400, 1800, 1540, 980, 1980]    
 Product_Name3 = ["Musique occidentale", "Enka", "J pop", "Film occidental", "Idole", "Classique", "Film japonais", "Série dramatique", "Planification", "Anime"]
Product_Price3 = [1980, 2200, 2500, 3500, 2980, 1980, 3800, 2690, 1980, 2400]    
 Product_Name4 = ["Détergent", "Ampoule", "Cadeau", "Produits non médicinaux", "Aliments pour animaux domestiques", "Piles sèches", "Papeterie", "Produits pour hommes", "Produits pour femmes", "Produits saisonniers" ]
Product_Price4 = [498, 198, 1980, 398, 980, 248, 398, 2980, 3580, 1980]


# Colonne de données pour l'écriture (combinée avec la génération de table)
DL1 = "Category, Product, Price, Units, "
DL2 = "Card, Number, Payment, Tax, "
DL3 = "User, Zip, Prefecture, Address, Tel, Email"

# Lancer le traitement de la démo
print("********************************************************************")
 print ("Lancer la génération automatique et l'insertion des données dans la table de vente centralisée sur MemSQL")
print("********************************************************************")
print

from datetime import datetime
 print ("Date et heure de début du processus d'insertion des données:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print

try:

 # Paramètre de la fonction de génération automatique de données Python
    from faker.factory import Factory
    Faker = Factory.create
    fakegen = Faker()
    fakegen.seed(0)
    fakegen = Faker("ja_JP")
    
 #Diverses définitions de variables
 # Nombre total de données à générer -> Modifiez ici le cas échéant
    ##########################################################
    Loop_Count = 10000
    ##########################################################
 # Drapeau de réglage de la synchronisation (0: pas de temps d'attente 1: 1 seconde 2: aléatoire)
    ##########################################################
    Wait_Flag = 2
    ##########################################################
 # J'ai fourni quelques options pour le rendre plus démo plus tard.
    
 # À intervalles réguliers (en secondes dans l'heure système)
    Sleep_Wait = 1
 # Intervalle aléatoire (ajusté en fonction de la situation réelle)
    Base_Count = 500000
    
 #Autres variables
    Counter = 0
    Work_Count = 1

 #Connectez-vous avec MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
    
    with db.cursor() as cursor:
    
 #Génération de données de vérification
        while Counter < Loop_Count:
            
 # Sélectionnez le type de produit à écrire au hasard (cette fois, il y a 5 types, donc convertissez de force en 0-4)
            Category_ID = fakegen.random_digit()
            if Category_ID > 4: Category_ID = Category_ID - 5
            
 # Réglage du nom de la catégorie
            Category = Category_Name[Category_ID]

 # Sélectionnez 10 types de produits dans chaque catégorie
            Product_ID = fakegen.random_digit()
           
 # Sélectionnez les informations de colonne qui remplissent les conditions
 # Le nombre est ajusté comme ça
            if Category_ID == 0:
                Product = Product_Name0[Product_ID]
                Price = Product_Price0[Product_ID]
                Units = fakegen.random_digit() + 1
                
            elif Category_ID == 1:
                Product = Product_Name1[Product_ID]
                Price = Product_Price1[Product_ID]
                Units = 1
                
            elif Category_ID == 2:
                Product = Product_Name2[Product_ID]
                Price = Product_Price2[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >3: Units = 3  
                    
            elif Category_ID == 3:
                Product = Product_Name3[Product_ID]
                Price = Product_Price3[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >2: Units = 2
                    
            else:
                Product = Product_Name4[Product_ID]
                Price = Product_Price4[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >4: Units = 4
                
 #Informations de paiement
            if str(fakegen.pybool()) == "True":
 Carte = "cash"
            else:
                Card = fakegen.credit_card_provider()
    
            Number = fakegen.credit_card_number()               
 si Carte == "cash": Number = "N / A"
            
 # Calculer le paiement total et la taxe à la consommation
            Payment = Price * Units
            Tax = Payment * 0.1

 # Génération d'informations sur l'acheteur
            User = fakegen.name()
            Zip = fakegen.zipcode()
            Address = fakegen.address()
            
 # Extraction des informations de la préfecture
             pattern = u"Tokyo|Hokkaido|(?:Kyoto|Osaka)Préfecture|.{2,3}Préfecture"
            m = re.match(pattern , Address)
            if m:
                Prefecture = m.group()
            
            Tel = fakegen.phone_number()
            Email = fakegen.ascii_email()
       
 # Écrivez d'ici à la table de régulation de chaque base de données
            DV1 = Category+"','"+Product+"','"+str(Price)+"','"+str(Units)+"','"
            DV2 = Card+"','"+Number+"','"+str(Payment)+"','"+str(Tax)+"','"
            DV3 = User+"','"+Zip+"','"+Prefecture+"','"+Address+"','"+Tel+"','"+str(Email)
        
            SQL_Data = SQL_Head +"("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"
            
            cursor.execute(SQL_Data)
            db.commit()
                        
            
 #Affichez les données générées sur la console (commentez si inutile)
            print SQL_Data
            print
            
 #Ajustement de l'intervalle de génération
            if Wait_Flag == 1:
                time.sleep(Sleep_Wait)
            elif Wait_Flag == 2:
                Wait_Loop = Base_Count * fakegen.random_digit() + 1
                for i in range(Wait_Loop): Work_Count = Work_Count + i
        
 #Update compteur de boucle
            Counter=Counter+1

except KeyboardInterrupt:
    print("************************")
 print ('!!!!! Une interruption s'est produite !!!!!')
    print("************************")
    print

finally:
 #Close connexion à la base de données
    db.close()
    print("**************************************")
 print ("Nombre total de données générées:" + str (compteur))
    print("**************************************")
    print
    print("************************************************************************")
 print ("Génération automatique et insertion de données dans la table de vente centralisée sur MemSQL terminée")
    print("************************************************************************")
 print ("Date et heure de fin du processus d'insertion des données:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print


Vérifiez le fonctionnement au cas où.

image.png

Vérifiez également le contenu du tableau.

image.png

Effectuer une vérification urgente

Modifiez la partie ** job () ** du premier script préparé comme suit. (La partie qui gère le retour de la requête est conçue pour une vérification ultérieure.)


 # Lancer le traitement
    from datetime import datetime
 print ("Date et heure d'exécution du JOB:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
    print
           
 #Connectez-vous avec MemSQL
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='qiita',
                         password='adminqiita',
                         db='Test',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
    
    with db.cursor() as cursor:
        
 # Définir une requête pour compter le nombre d'insertions de données urgentes
        SQL_Data = "SELECT Count(*) FROM Qiita_Test"
        
 # Envoyer la requête et valider
        cursor.execute(SQL_Data)                    
        db.commit()
        
 # Obtenir les résultats de la requête
        rows = cursor.fetchall()
        
 #Initialiser le tampon de travail
        Tmp_Data = []
        
 #Le contenu de la requête augmentera à l'avenir, alors préparez-vous à cela ...
        for Query_Data in rows:
                    
            for item in Query_Data.values():
                
                Tmp_Data.append(item)
                
 print ("Nombre de données à ce stade:" + str (Tmp_Data [0]))
                print
        
         
   
    db.close()

Dans cette vérification, la variable est définie pour être exécutée 3 fois toutes les 5 secondes. Pour l'intervalle de génération de données, définissez un intervalle aléatoire visant un petit œil réel, démarrez-le d'abord, puis exécutez le script modifié sans interruption.

image.png

Il semble que cela a fonctionné en toute sécurité, donc je vais laisser la préparation pour cette fois ici. La prochaine fois, j'essaierai d'augmenter le contenu de la table de traitement régulière et du ** job () ** créé cette fois.

** Continuer vers la vérification de la capacité de traitement MemSQL (application 2) ... **

Merci

Cette vérification est effectuée à l'aide de la version gratuite officielle (V6) de MemSQL.

** À propos de la version gratuite de MemSQL ... **

Nous tenons à remercier MemSQL d'avoir fourni cette précieuse opportunité, et si ce contenu diffère du contenu publié sur le site officiel de MemSQL, les informations de MemSQL prévaudront. Veuillez comprendre cela.

Recommended Posts

Vérification de la capacité de traitement MemSQL (application 2)
Vérification de la capacité de traitement MemSQL (Application 1)
Vérification de la capacité de traitement MemSQL (application 3)