[PYTHON] Azure Cosmos DB ab 2020 - Azure Synapse Link-Demo (Teil 1)

Einführung

Dieser Artikel ist ein Beispiel für einen Erklärungsartikel von Azure Synapse Link für Azure Cosmos DB, der auf GitHub GitHub veröffentlicht wurde. Die Beschreibung der Hauptfamilie ist in englischer Sprache. [Hier] Die japanische Übersetzung ist im Repository GitHub-ymasaoka verfügbar. Greifen Sie also auf das zu, was Sie möchten.

--Honke (Englisch): [Azure-Samples / Synapse] GitHub

Der Beispiel-Azure-Synapse-Link für Azure Cosmos DB lautet / Notebooks / PySpark / Synapse-Link für Cosmos DB-Beispiele /. Es ist in Synapse% 20Link% 20 für% 20Cosmos% 20DB% 20samples verfügbar. Wenn Sie die Details überprüfen möchten, überprüfen Sie bitte auch hier.

Umgebung

Die folgende Umgebung ist erforderlich, um dieses Beispiel auszuführen.

In den folgenden Artikeln erfahren Sie, wie Sie eine Umgebung erstellen.

Szenario 1 - Internet der Dinge (IoT)

Dieses Mal habe ich Azure Synapse Spark verwendet, um Streaming- und Batch-IoT-Daten in Azure Cosmos DB zu bringen, Azure Synapse Link zum Durchführen von Verknüpfungen und Aggregationen verwendet und Azure Cognitive Services für Spark (MMLSpark) verwendet. Anomaliedetektor] Bis Sie [Anomaliedetektor] ausführen. [Anomaly Detector] [Anomaly Detector] ist ein Azure-Dienst (Vorschau vom 18. August 2020), mit dem Sie Anomalieerkennungsfunktionen problemlos in Ihre App integrieren können.

Dieser Fluss

Schreiben Sie IoT-Streaming-Daten und Batch-Daten mithilfe des Azure Synapse Analytics Spark-Pools in den Cosmos DB-Transaktionsspeicher. Danach werden die Daten, die automatisch vom Transaktionsspeicher zum Analysespeicher synchronisiert werden, von Azure Synapse Link für Azure Cosmos DB referenziert, und die Daten werden in Azure Synapse Analytics kombiniert und aggregiert.

dataflow.PNG

Vorbereitungen

[/ Notebooks / PySpark / Synapse Link für Cosmos DB-Beispiele / IoT / README.md](https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos% Bereiten Sie die Daten vor und stellen Sie sie gemäß dem Inhalt von 20DB% 20samples / IoT / README.md) ein.

Hier gibt es fünf Dinge zu tun:

Laden Sie die Datei IoTDeviceInfo.csv hoch

Die Datei "IoTDeviceInfo.csv" lautet / Notebooks / PySpark / Synapse Link für Cosmos DB-Beispiele / IoT / IoTData 20Link% 20for% 20Cosmos% 20DB% 20samples / IoT / IoTData) Es ist unten. Rufen Sie die Datei ab und laden Sie sie über den Azure Synapse-Arbeitsbereich auf Azure Data Lake Storage hoch.

Erstellen Sie zunächst einen ** IoTData ** -Ordner. Wählen Sie die Registerkarte ** Daten / Verknüpft ** aus, wählen Sie die Azure Data Lake Storage Gen 2-Umgebung aus, die an die Azure Synapse Analytics-Umgebung angehängt ist, und wählen Sie ** + Neuer Ordner **, um einen neuen Ordner zu erstellen. Ich werde.

スクリーンショット 2020-08-18 22.37.34.png

スクリーンショット 2020-08-18 22.40.27.png

Platzieren Sie nach dem Erstellen des IotData-Ordners die Datei "IoTDeviceInfo.csv" im Ordner "IoTData".

スクリーンショット 2020-08-18 22.44.23.png

IAM-Einstellungen (Storage Account Access Control)

Fügen wir der Registerkarte Zugriffssteuerung (IAM) eine Rolle hinzu.

スクリーンショット 2020-08-18 23.03.22.png

Die zuzuweisende Rolle sollte ** Storage BLOB Data Co-Creator ** sein. Fügen Sie Ihre eigenen Benutzer hinzu und fügen Sie Rollen hinzu.

スクリーンショット 2020-08-18 23.09.50.png

Aktivieren Sie Azure Synapse Link

Wenn Sie Azure Synapse Link nicht aktivieren, können Sie keinen Azure Cosmos DB-Analysespeicher erstellen. Aktivieren Sie ihn daher. Klicken Sie im Daten-Explorer auf die Schaltfläche ** Azure Synapse Link aktivieren **, um Azure Synapse Link zu aktivieren.

スクリーンショット 2020-08-18 23.16.46.png

スクリーンショット 2020-08-18 23.17.13.png

Datenbanken und Container erstellen

Erstellen Sie die Datenbank und den Container gemäß den obigen Informationen. GitHub sagt, dass beim Erstellen einer CosmosDBIoTDemo-Datenbank ** die Einstellung für die automatische Skalierung aktiviert und mit 4000 RU / s erstellt wird **, aber bis die eigentliche Verarbeitung beginnt, sind es 400 RU / s ohne automatische Skalierung (manuell). Es gibt kein Problem beim Erstellen. Dies kann später geändert werden. Die Partitionsschlüssel für den Container sind beide / id. Stellen Sie sicher, dass ** Analytical Store ** * On * ist, wenn Sie jeden Container erstellen.

Erstellen Sie einen Verbindungsdienst für Azure Cosmos DB

Bitte beachten Sie hierzu den zu Beginn eingeführten Artikel. Der Name des Verbindungsdienstes lautet "CosmosDBIoTDemo".

Nachdem Sie den verknüpften Dienst erstellt haben, sollten Sie die Cosmos DB-Informationen im Azure Synapse-Arbeitsbereich wie folgt anzeigen können:

スクリーンショット 2020-08-18 23.32.39.png

01-CosmosDBSynapseStreamIngestion

Dies ist das erste Notizbuch. Hier verwenden wir ** strukturiertes Streaming, um Streaming-Daten in die Azure Cosmos DB-Sammlung ** zu bringen. Wie die IoTDeviceInfo.csv-Datei lautet die .ipynb-Datei GitHub. Da es sich in spark-notebooks / pyspark / 01-CosmosDBSynapseStreamIngestion.ipynb befindet, holen Sie es ab und importieren Sie es aus ** Develop ** in den Azure Synapse-Arbeitsbereich.

スクリーンショット 2020-08-18 23.41.17.png

Wenn der Import abgeschlossen ist, sollte Ihr Notizbuch wie unten gezeigt angezeigt werden. Stellen Sie nun sicher, dass ** Anhängen an ** auf den vorgefertigten Spark-Pool und ** Sprache ** auf PySpark eingestellt ist.

スクリーンショット 2020-08-18 23.42.31.png

Sobald dies erledigt ist, müssen Sie nur noch Ihr Notebook ausführen. ** Wenn Sie die Einstellungen für die automatische Skalierung von Azure Cosmos DB vor dem Ausführen geändert haben **, aktivieren Sie die automatische Skalierung **. Wenn Sie es mit 400 RU / s ohne automatische Skalierung ausführen, wird der Statuscode ** Http 429 ** zurückgegeben und Sie können die Daten nicht füllen. Informationen zur Ausführung des Codes finden Sie in der Erläuterung im Notizbuch. Wie in Readme.md beschrieben, erstellt dieses Notizbuch hauptsächlich Daten für die Beispielausführung in der zweiten Hälfte. Lassen wir es für "2 bis 5 Minuten", nachdem die Ausführung beginnt. ** beinhaltet jedoch nicht den ersten Start des Spark-Pools in dieser Zeit von 2 bis 5 Minuten. ** Die Ausführung von Spark-Pools dauert lange, da sie den Pool beim ersten Start von PySpark nach dem Leerlauf starten.

Wie in der folgenden Abbildung gezeigt, bestätigen wir, dass die Verarbeitung von Zelle 3 und Zelle 5 abgeschlossen ist und die Verarbeitung von Zelle 7 weiterhin funktioniert, und verlassen Sie sie dann. Diese ** vernachlässigte Arbeit ist ein Reproduktionsbeispiel für den Empfang und die Speicherung von IoT-Streaming-Daten **.

スクリーンショット 2020-08-18 23.53.27.png

スクリーンショット 2020-08-18 23.54.34.png

Überprüfen Sie im Laufe der Zeit, ob die Daten im Container "IoTSignals" von Azure Cosmos DB ausgefüllt wurden. ** Die Einstellungen für die automatische Skalierung können wiederhergestellt werden und sind in Ordnung **.

スクリーンショット 2020-08-19 0.12.09.png

スクリーンショット 2020-08-19 0.13.30.png

02-CosmosDBSynapseBatchIngestion

Das nächste Notizbuch. Hier verwenden wir ** Azure Synapse Spark, um Batchdaten in die Azure Cosmos DB-Sammlung ** zu bringen. Wie bei 01 GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark-notebooks/ Laden Sie das Notizbuch in pyspark / 02-CosmosDBSynapseBatchIngestion.ipynb) herunter, importieren Sie es in den Azure Synapse-Arbeitsbereich und führen Sie es aus.

スクリーンショット 2020-08-19 0.16.55.png

Wie Sie anhand des Inhalts von "IoTDeviceInfo.csv" sehen können, sind die Daten nur 10 Zeilen ohne den Header-Teil. Sie müssen Ihre Azure Cosmos DB-Einstellungen für die automatische Skalierung hier also nicht erneut rückgängig machen **.

Nachdem das Notizbuch ausgeführt wurde, überprüfen wir, ob die Daten in den Container "IoTDeviceInfo" in Azure Cosmos DB eingefügt wurden. Es sollten 10 Datenzeilen ordnungsgemäß vorhanden sein.

スクリーンショット 2020-08-19 0.28.52.png

03-CosmosDBSynapseJoins

Das nächste Notizbuch. Hier verwenden wir ** Azure Synapse Link, um Verknüpfungen und Aggregate in Azure Cosmos DB-Sammlungen durchzuführen **. Zunächst GitHub / 03-CosmosDBSynapseJoins.ipynb) Holen Sie sich das Notizbuch, importieren Sie es in den Azure Synapse-Arbeitsbereich und führen Sie es aus.

Ich werde hier ein wenig erklären. Zuvor wurde in einem in einen Azure Synapse-Arbeitsbereich importierten Notizbuch PySpark wie unten gezeigt ausgeführt und ** in den Azure Cosmos DB-Transaktionsspeicher geschrieben **.

streamQuery = dfIoTSignals\
                    .writeStream\
                    .format("cosmos.oltp")\
                    .outputMode("append")\
                    .option("spark.cosmos.connection.mode", "gateway") \
                    .option("spark.synapse.linkedService", "CosmosDBIoTDemo")\
                    .option("spark.cosmos.container", "IoTSignals")\
                    .option("checkpointLocation", "/writeCheckpointDir")\
                    .start()

Worauf wir hier achten wollen, ist der Teil .format (" cosmos.oltp "). Mit cosmos.oltp können Sie eine Verbindung zum Transaktionsspeicher von Azure Cosmos DB herstellen. (Es wurde auch im Teil "Wussten Sie schon?" Des Notizbuchs erwähnt.) Von hier aus greifen wir auf den Azure Cosmos DB Analysis Store zu. Daher kann cosmos.oltp nicht verwendet werden. Stattdessen verwenden wir cosmos.olap.

Darüber hinaus erstellt dieses Notizbuch eine * Spark-Tabelle, die der Auflistung des Azure Cosmos DB-Analysespeichers zugeordnet ist. Diese Spark-Tabelle ist die sogenannte ** Metadatentabelle **, die die Aufgabe hat, ** die Abfrage an die entsprechende Azure Cosmos DB Analytics-Speichersammlung ** zu übergeben, wenn die Spark SQL-Abfrage ausgeführt wird.

%%sql
create database CosmosDBIoTDemo
%%sql
create table if not exists CosmosDBIoTDemo.IoTSignals
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDBIoTDemo',
        spark.cosmos.container 'IoTSignals')
%%sql
create table if not exists CosmosDBIoTDemo.IoTDeviceInfo
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDBIoTDemo',
        spark.cosmos.container 'IoTDeviceInfo')
df_RPM_details = spark.sql("select a.deviceid \
                                 , b.devicetype \
                                 , cast(b.location as string) as location\
                                 , cast(b.latitude as float) as latitude\
                                 , cast(b.longitude as float) as  longitude\
                                 , a.measuretype \
                                 , a.unitSymbol \
                                 , cast(sum(measureValue) as float) as measureValueSum \
                                 , count(*) as count \
                            from CosmosDBIoTDemo.IoTSignals a \
                            left join CosmosDBIoTDemo.IoTDeviceInfo b \
                            on a.deviceid = b.deviceid \
                            where a.unitSymbol = 'RPM' \
                            group by a.deviceid, b.devicetype, b.location, b.latitude, b.longitude, a.measuretype, a.unitSymbol")

Beachten Sie, dass die automatischen Einstellungen für Azure Cosmos DB für den Zugriff auf den Azure Cosmos DB Analysis Store nicht relevant sind. Bei der automatischen Skalierung geht es um den Transaktionsspeicher, und der Zugriff auf den Transaktionsspeicher und den Analysespeicher ist unterschiedlich. Also noch einmal ** Sie müssen die Einstellungen für die automatische Skalierung von Azure Cosmos DB nicht ändern **.

Wenn der Lauf abgeschlossen ist, sollten Sie in der Lage sein, die Daten zu sehen, auf die aus der Sammlung im Azure Cosmos DB-Analysespeicher verwiesen wird, und was Sie mit diesen Daten auf der Karte grafisch dargestellt haben.

スクリーンショット 2020-08-19 0.55.38.png

スクリーンショット 2020-08-19 0.56.44.png

04-CosmosDBSynapseML

Nun zum letzten Notizbuch. Im Jahr 03 habe ich gerade die Signaldaten von der Dampfturbine des empfangenen Kraftwerks genommen und auf der Karte aufgezeichnet. Dies allein mag eine große Leistung sein, aber ich möchte einen weiteren Schritt in Richtung Nutzung von Big Data unternehmen. Schließlich führen wir ** Synapse Spark (MMLSpark) mit Azure Synapse Link und Azure Cognitive Services durch, um eine Anomalieerkennung durchzuführen **. Das Notizbuch ist das gleiche wie zuvor GitHub -notebooks / pyspark / 04-CosmosDBSynapseML.ipynb) Bitte importieren Sie es.

Durch maschinelles Lernen wird die Anzahl der Artikel erhöht, wenn sie berührt werden. Daher werde ich sie hier nicht beschreiben, sondern hier MMLSpark (Azure Cognitive Services on Spark). index.html) wird verwendet. ** Sie müssen über ein Cognitive Services-API-Konto verfügen, um dieses Notizbuch ausführen zu können, da es die Anomaly Detector-API verwendet, die Teil von Cognitive Services ist. ** _ (Sagen wir es beim Erstellen der ersten Umgebung ...) _

Erstellen Sie eine neue über das Azure-Portal. Der Anomalie-Detektor kann auch im Portal gesucht und erstellt werden. Ab dem 18.08.2020 erstellen Sie ihn bitte mit Cognitive Services.

スクリーンショット 2020-08-19 1.15.05.png

In Zelle 8 gibt es einen Teil, in den Sie den Schlüssel Ihres Cognitive Services-API-Kontos eingeben können. Ersetzen Sie den Teil "Einfügen Ihres Schlüssels hier" des ".setSubscriptionKey" ("Einfügen Ihres Schlüssels hier") durch Ihren eigenen Schlüssel.

from pyspark.sql.functions import col
from pyspark.sql.types import *
from mmlspark.cognitive import SimpleDetectAnomalies
from mmlspark.core.spark import FluentAPI

anomaly_detector = (SimpleDetectAnomalies()
                            .setSubscriptionKey("paste-your-key-here")
                            .setUrl("https://westus2.api.cognitive.microsoft.com/anomalydetector/v1.0/timeseries/entire/detect")
                            .setOutputCol("anomalies")
                            .setGroupbyCol("grouping")
                            .setSensitivity(95)
                            .setGranularity("secondly"))

Sie finden den Schlüssel im Azure-Portal.

スクリーンショット 2020-08-19 1.23.43.png

Führen Sie nach dem Ersetzen der Schlüssel Ihr Notizbuch aus, um die Ergebnisse anzuzeigen.

スクリーンショット 2020-08-19 1.33.33.png

Leider fehlten in meiner Umgebung Daten, und es sah nicht wie das Beispielbild in meinem Notizbuch aus. Es scheint, dass der Inhalt voreingenommen war, wahrscheinlich aufgrund fehlender Daten. ** Bitte lassen Sie mich in den Kommentaren wissen, ob Sie diese Demo in Ihrer Umgebung ausführen und die Abbildung wie im Beispielbild reproduzieren können! ** _ (Ich möchte nach der Datenmenge im Azure Cosmos DB-Container usw. fragen.) _

Umweltsanierung

Wenn Sie mit der Demo fertig sind, ** entfernen Sie unnötige Umgebungen, um eine Abrechnung zu vermeiden **.

In Azure Cosmos DB ist Free Tier jedoch nur für Demo-Umgebungen aktiviert, und der Azure Synapse-Arbeitsbereich beendet die Berechnung der Abrechnung, sobald der Spark-Pool inaktiv ist. Wenn Sie den Azure Synapse-Arbeitsbereich selbst nicht verwenden, werden Ihnen einige hundert Yen pro Monat berechnet. Wenn Sie keine anderen Azure Cosmos DB-Konten oder -Datenbanken haben, können Sie diese für spätere Studien belassen. Es liegt an Ihnen, was zu tun ist. Ich weiß jedoch nicht, wann sich das Abrechnungssystem ändern wird. Wenn Sie also keine plötzlich hohe Rechnung erhalten möchten, löschen Sie die Umgebung ordnungsgemäß.

Referenzinformationen für Bereinigungsarbeiten finden Sie im folgenden Artikel, der zum dritten Mal in diesem Artikel erscheint.

schließlich

Das diesmal eingeführte und erläuterte GitHub-Repository enthält eine weitere Demo von Azure Synapse Link für Azure Cosmos DB. Wenn Sie interessiert sind, versuchen Sie dies bitte auch.


Recommended Posts

Azure Cosmos DB ab 2020 - Azure Synapse Link-Demo (Teil 1)
Greifen Sie mit Spring Boot auf Azure Cosmos DB zu