Dieses Mal möchte ich die Verarbeitung für eine MemSQL-Tabelle, in die kontinuierlich Daten eingefügt werden, in regelmäßigen Abständen kontinuierlich ausführen, Daten extrahieren, die in den angegebenen Zeitbereich fallen, und eine Tabelle generieren. .. Da es sich um eine Überprüfung in einer Situation handelt, in der die Zeit verschoben wird, ist es nicht zu leugnen, dass sie etwas locker ist. Andererseits hat MemSQL, das mehrere Einfügeprozesse pro Sekunde ausführt, nacheinander eine weitere Prozessanforderung erhalten Um zu bestätigen, um welche Art von Bewegung es sich handelt, werde ich es vorerst versuchen.
Machen Sie nicht alles zusammen! In der Welt der Formen gibt es eine Stapelverarbeitung, aber dieses Mal gibt es eine Lösung namens Equalum, die Echtzeit-Streaming erstellen kann, das lange Zeit ohne Programm verifiziert und eingeführt wurde. Deshalb haben wir eine Zusammenarbeit mit ihm angenommen ** " Welches MemSQL ist die Umgebung, um zu realisieren, "ohne als Transaktionsdieb bezeichnet zu werden", "sich keine Sorgen um die Zeit zu machen", "wie Sie möchten" und "kreative Abfrageverarbeitung" ** für das vorhandene ursprüngliche Datensystem. Kannst du gehen? Ich würde gerne ... sehen.
Ändern Sie zunächst das vorherige Python-Skript in einen Prozess, der regelmäßig ausgeführt wird. (1) Erstellen Sie eine neue Tabelle, indem Sie einen Tabellennamen mit den Datums- und Uhrzeitinformationen des Startzeitpunkts generieren (2) Extrahieren Sie die erforderlichen Informationen aus der kontinuierlich eingefügten Originaltabelle, indem Sie den Zeitbereich angeben (3) Speichern Sie die zurückgegebenen Informationen in der neuen Tabelle, die in (1) erstellt wurde. Ich werde in den Prozess setzen. (Auch diesmal ist es möglicherweise voller Tsukkomi-Plätze, da es sich aufgrund von Stromarbeiten um eine Ayer-Version handelt, aber dieser Bereich ist absolut verzeihend ...)
Das diesmal verwendete SQL ist übrigens wie folgt.
SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture
FROM Qiita_Test
WHERE ts BETWEEN 'YYYY/mm/dd HH:MM:SS.000000' AND 'YYYY/mm/dd HH:MM:SS.000000'
SORDER BY ts;
Die Überprüfung wurde unter der Annahme durchgeführt, dass die diesmal festgelegte Reihe von Prozessen alle 30 Sekunden ausgeführt wird und die Daten von diesem Punkt bis 30 Sekunden zuvor als Bedingungen extrahiert werden. (Tatsächlich ist es aufgrund anderer Einflüsse leicht falsch ausgerichtet ... (Schweiß)) Die Last der Bedingungsextraktion und der Erstellung neuer Tabellen wird angewendet, indem die kontinuierlich eingefügte Tabelle regelmäßig unterbrochen wird. Wenn wir überprüfen können, was passiert, wenn wir es anrufen, werden wir mit der Arbeit fortfahren, indem wir vorerst OK sagen.
coding: utf-8
# Führen Sie Aufgaben in regelmäßigen Abständen in Python aus (Power-Technik-Version).
# Version 2.7
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
# Modul zum Importieren
import schedule
import time
import pymysql.cursors
# In SQL-Anweisungen verwendete Informationen
SQL1 = "SELECT ts, Category, Product, Price, Units, Card, Payment, Prefecture FROM Qiita_Test "
SQL3 = " ORDER BY ts"
# Definition der Snapshot-Tabellenspalte
DC0 = "id BIGINT AUTO_INCREMENT, PRIMARY KEY(id), O_ts DATETIME(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, "
DC2 = "Card VARCHAR(40), Payment INT, Prefecture VARCHAR(10)"
# Spaltendefinition zum Schreiben in SQL
DL1 = "O_ts, Category, Product, "
DL2 = "Price, Units, Card, Payment, "
DL3 = "Prefecture"
# Stellen Sie hier den Prozess so ein, dass er in regelmäßigen Abständen ausgeführt wird
def job():
# Zeiteinstellung, um vom Zeitpunkt des Schnappschusses zurückzukehren
Time_Int = 30
Time_Adj = 0
from datetime import datetime, date, time, timedelta
# Aktuelle Datums- und Uhrzeitinformationen abrufen
now = datetime.now()
print ("JOB starten:" + now.strftime ("% Y /% m /% d% H:% M:% S"))
# Generieren Sie einen Tabellennamen für den Snapshot und generieren Sie eine SQL-Anweisung
dt = 'Qiita_' + now.strftime('%Y%m%d_%H%M%S')
Table_Make = "CREATE TABLE IF NOT EXISTS " + dt
SQL_Head = "INSERT INTO " + dt
# Endinformationen zur in SQL verwendeten Zeiteinstellung (Mit Time_Adj anpassen, wenn eine Korrektur erforderlich ist)
pre_sec = now - timedelta(seconds = Time_Int + Time_Adj)
from_dt = pre_sec.strftime("%Y/%m/%d %H:%M:%S")
#Startinformationen zur in SQL verwendeten Zeiteinstellung (Anpassung mit Time_Adj und + -if Korrektur erforderlich)
now_sec = now + timedelta(seconds = Time_Adj)
to_dt = now_sec.strftime("%Y/%m/%d %H:%M:%S")
# SQL-Anweisung mit Zeitbereich generieren
SQL2 = "WHERE ts BETWEEN '" + from_dt + ".000000' AND '" + to_dt + ".000000'"
SQL_Def = SQL1 + SQL2 + SQL3
#Verbinden Sie sich mit MemSQL
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='qiita',
password='adminqiita',
db='Test',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
cursor.arraysize = 1000
#Erstellen Sie eine neue Tabelle für Schnappschüsse
cursor.execute(Table_Make +"("+DC0+DC1+DC2+")" )
db.commit()
#Initialize Arbeitspuffer
Tmp_Data = []
# SQL zur Abfrage senden und festschreiben
cursor.execute(SQL_Def)
db.commit()
# Abfrageergebnisse abrufen
rows = cursor.fetchall()
# Abfrageergebnisse abrufen
for Query_Data in rows:
for item in Query_Data.values():
Tmp_Data.append(item)
# In jeder Spalte reflektieren
Category = str(Tmp_Data[0])
Product = str(Tmp_Data[1])
Price = str(Tmp_Data[2])
O_ts = str(Tmp_Data[3])
Units = str(Tmp_Data[4])
Prefecture = str(Tmp_Data[5])
Payment = str(Tmp_Data[6])
Card = str(Tmp_Data[7])
#Erstellen Sie SQL und speichern Sie es in der Snapshot-Tabelle
DV1 = O_ts + "','" + Category + "','" + Product + "','"
DV2 = Price + "','" + Units + "','" + Card + "','" + Payment + "','"
DV3 = Prefecture
SQL_Data = SQL_Head + "("+DL1+DL2+DL3+") VALUES('"+DV1+DV2+DV3+"')"
# Für Schnappschuss in Tabelle einfügen
cursor.execute(SQL_Data)
db.commit()
Tmp_Data = []
# Trennen Sie die Datenbankverbindung
db.close()
print ("Ende des Jobs:" + datetime.now (). strftime ("% Y /% m /% d% H:% M:% S"))
print ("+++++++++++++++++++++++++++++++")
print
# Hauptteil von hier
def main():
# Einstellung der zu verwendenden Variablen
Loop_Count = 3
Count = 0
Interval_Time = 30
# Startzeit des gesamten Prozesses
from datetime import datetime
print ("Datum und Uhrzeit des Programmstarts:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
print
# Alle 10 Minuten
# schedule.every(10).minutes.do(job)
# Alle 2 Stunden
# schedule.every(2).hours.do(job)
# 10 Uhr jeden Tag
# schedule.every().day.at("10:00").do(job)
# jeden Montag
# schedule.every().monday.do(job)
schedule.every(Interval_Time).seconds.do(job)
# In einer Endlosschleife verarbeiten
while True:
schedule.run_pending()
#Mysterischer Zauber ... ww
time.sleep(Interval_Time)
# Überprüfen Sie die angegebene Anzahl von Malen
if (Count >= Loop_Count):
break
else:
Count += 1
print
print (str (Count) + "times: Die angegebene Anzahl von Jobs wurde abgeschlossen!")
print
from datetime import datetime
print ("Programmenddatum und -zeit:" + datetime.now (). Strftime ("% Y /% m /% d% H:% M:% S"))
if __name__ == "__main__":
main()
Das Ergebnis der Ausführung dieses Skripts ist wie folgt.
Datum und Uhrzeit des Programmstarts: 2020/10/12 14:31:35
Beginn des Jobs: 2020/10/12 14:32:05
Ende des Jobs: 2020/10/12 14:32:06
+++++++++++++++++++++++++++++++
Beginn des Jobs: 2020/10/12 14:32:36
Ende des Jobs: 2020/10/12 14:32:37
+++++++++++++++++++++++++++++++
Beginn des Jobs: 2020/10/12 14:33:07
Ende des Jobs: 2020/10/12 14:33:09
+++++++++++++++++++++++++++++++
3 mal: Die angegebene Anzahl von Jobs wurde abgeschlossen!
Datum und Uhrzeit des Programmendes: 2020/10/12 14:33:39
Wenn es wie erwartet funktioniert, sollten Sie drei Tabellen mit Startzeitinformationen zu MemSQL haben. Zunächst werde ich es jedes Mal mit dem bekannten ** DBeaver ** überprüfen.
Es scheint, dass es zusammen mit der Tabelle ** Qiita_Test ** der ursprünglichen Tabelle erfolgreich abgeschlossen wurde. Für alle Fälle überprüfen wir den Anfang und das Ende jeder Tabelle.
Aufgrund der Reihenfolge, in der das Einfügeskript und dieses Skript gestartet wurden, ist die anfängliche Datenzeit etwas spät, aber vorerst scheint die Extraktion des Anstiegs gut verarbeitet zu sein.
Als nächstes vergleichen wir die Endteile.
Die letzten Daten, TIMESTAMP (6), sind 14: 32: 04: 804694, sodass Sie sehen können, dass sie wie erwartet passen.
Überprüfen Sie auch die anderen beiden.
Ich habe die Daten zwischen den Verarbeitungsintervallen verpasst, aber der definierte Zeitbereich wird abgedeckt. (Ich muss die Logik ausarbeiten ... (Schweiß))
Es ist sicher in Reichweite.
Der letzte Tisch ist ...
Nach wie vor konnten wir nicht 2-3 Daten erfassen, die an die Grenze der Zeitberechnung im Verarbeitungsprozess passen, aber es scheint, dass sie innerhalb des in der SQL-Anweisung definierten Zeitbereichs liegen.
Wenn ich die Zeit mit etwas mehr Sorgfalt steuern kann, denke ich, dass sich die Genauigkeit der Datennutzung in einem präziseren und kürzeren Zyklus verbessern wird, aber mit Hilfe der dringenden In-Memory-Leistung habe ich einen großen Prozess eingeführt (zum Beispiel habe ich ihn zuvor eingeführt). Selbst wenn MemSQL unter Verwendung von Equalum ausgeführt wird, um den bei dieser Überprüfung durchgeführten kontinuierlichen Einfügungsteil zu ersetzen, ist es möglich, Daten frei zu extrahieren und die Arbeit des nächsten Prozesses mit einer Leistung im Millisekundenbereich auszuführen. Ich hoffe du verstehst, dass es extrem realistisch ist.
In einem kurzen Verarbeitungszyklus sind grundsätzlich nur wenige Daten zu verarbeiten, sodass es möglich ist, die Datenextraktions- / Aggregationsverarbeitung auf simulierte Weise durchzuführen, während eine Situation frei angenommen wird, die der Gegenwart sehr nahe kommt. Don alle zusammen in einer Mitternachtscharge! Da dies nicht der Fall ist, werden alle Daten, die bis zu diesem Zeitpunkt verarbeitet werden können, aus der Datenquelle extrahiert und vorverarbeitet, die mit Equalum die Wand des vorgelagerten Silos überquert, und der Speicherplatz von MemSQL erweitert ( Zu diesem Zeitpunkt werden die Informationen im Silo flach in dem von derselben Datenbank verwalteten Speicherplatz verteilt, und es wird einfach, die Daten in einer für das Silo transparenten, übergreifenden Weise zu verwenden.
Selbst wenn Sie eine Extraktionsabfrage (einschließlich dieser Zeitangabe) für eine Tabelle festlegen, die 100.000 Zeilen mit Informationen enthält, die auf der ursprünglichen Seite von MemSQL erweitert wurden, können Sie natürlich auch einige Bedingungen festlegen. Da die Leistung im Millisekundenbereich stabil gehalten werden kann, sollten Sie in einer Situation, in der der Speicherplatz für die Speichererweiterung voraussichtlich groß ist, die erforderlichen IA-Server gruppieren und Daten vollständig im Speicher verarbeiten. Es ist möglich durchzuführen.
Die Tabelle, die dieses Mal durch Stapelverarbeitung extrahiert wurde, kann wie die zuvor eingeführte Excel-Tabelle gelesen werden, sodass sie mithilfe der Funktionen in Excel als BI verwendet werden kann und die hohe MySQL-Kompatibilität von MemSQL erreicht wird. Es wird möglich sein, es sofort als Quellmaterial für andere BI / AI- und andere Lösungen zu verwenden, die verwendet wurden. ** Verwenden wir MemSQL Vol.6: Derailment ** kann hilfreich sein.
Beim nächsten Mal möchte ich einige externe Verknüpfungen mit der diesmal extrahierten Tabelle ausprobieren. Diese Arbeit steht nicht in direktem Zusammenhang mit MemSQL, aber ich beschäftige mich etwas eingehender mit der "operativen Datenbank" (ich frage mich, ob sie nicht relational ist ...), die MemSQL als Nachricht veröffentlicht, und den jüngsten modernen Transaktionen. Überprüfen Sie die Zusammenarbeit mit der Systemdatenverarbeitung.
** Vorhandenes Datensystem >> Equalum >> MemSQL >> BI / AI / Roboter etc .. ** ** Relational <<<<<<<<<<<<< >>>>>>>> Operational **
Ich möchte einen Blick in die Welt von werfen.
Diese Überprüfung wird mit der offiziellen kostenlosen Version (V6) von MemSQL durchgeführt.
** Über die kostenlose Version von MemSQL ... **
Wir möchten MemSQL für die Bereitstellung dieser wertvollen Gelegenheit danken. Wenn sich dieser Inhalt von dem auf der offiziellen Website von MemSQL veröffentlichten Inhalt unterscheidet, haben die Informationen von MemSQL Vorrang. Bitte haben Sie Verständnis dafür.