[PYTHON] Azure Cosmos DB à partir de 2020 - Démonstration d'Azure Synapse Link (partie 1)

introduction

Cet article est un exemple d'article d'explication d'Azure Synapse Link pour Azure Cosmos DB publié sur GitHub GitHub. La description du chef de famille est en anglais. [Ici] La traduction japonaise est disponible dans le dépôt GitHub-ymasaoka, veuillez donc accéder à celui que vous voulez.

--Honke (anglais): [Azure-Samples / Synapse] GitHub --Traduction japonaise (non officielle): [ymasaoka / Synapse] GitHub-ymasaoka

L'exemple de lien Azure Synapse pour Azure Cosmos DB est [/ Notebooks / PySpark / Synapse Link for Cosmos DB samples /](https://github.com/ymasaoka/Synapse/tree/ja-jp/Notebooks/PySpark/ Il est disponible dans Synapse% 20Link% 20 pour% 20Cosmos% 20DB% 20 échantillons). Si vous souhaitez vérifier les détails, veuillez également vérifier ici.

Environnement

L'environnement suivant est requis pour exécuter cet exemple.

--Azure Cosmos DB Account (API SQL)

Veuillez consulter les articles suivants pour savoir comment créer un environnement.

Scénario 1-Internet des objets (IoT)

Cette fois, j'ai utilisé Azure Synapse Spark pour importer des données IoT en streaming et par lots dans Azure Cosmos DB, utilisé Azure Synapse Link pour effectuer des jointures et des agrégations, et utilisé Azure Cognitive Services sur Spark (MMLSpark). Anomaly Detector] Jusqu'à ce que vous exécutiez [Anomaly Detector]. [Anomaly Detector] [Anomaly Detector] est un service Azure (version préliminaire à compter du 18 août 2020) qui vous permet d'intégrer facilement des fonctions de détection d'anomalies dans votre application.

Ce flux

Ecrivez des données de streaming IoT et des données par lots dans le magasin de transactions Cosmos DB à l'aide du pool Azure Synapse Analytics Spark. Après cela, les données automatiquement synchronisées du magasin de transactions vers le magasin d'analyse sont référencées par Azure Synapse Link pour Azure Cosmos DB, et les données sont combinées et agrégées sur Azure Synapse Analytics.

dataflow.PNG

Préparation préalable

[/ Notebooks / PySpark / Synapse Link pour les exemples Cosmos DB / IoT / README.md](https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos% Préparez et définissez les données en fonction du contenu de 20DB% 20samples / IoT / README.md).

Il y a cinq choses à faire ici:

Télécharger le fichier IoTDeviceInfo.csv

Le fichier ʻIoTDeviceInfo.csv` est [/ Notebooks / PySpark / Synapse Link pour les exemples Cosmos DB / IoT / IoTData](https://github.com/ymasaoka/Synapse/tree/ja-jp/Notebooks/PySpark/Synapse% 20Link% 20 pour% 20Cosmos% 20DB% 20samples / IoT / IoTData) C'est ci-dessous. Récupérez le fichier et téléchargez-le sur Azure Data Lake Storage à partir de l'écran de l'espace de travail Azure Synapse.

Commencez par créer un dossier ** IoTData **. Sélectionnez l'onglet ** Données / Lié **, sélectionnez l'environnement Azure Data Lake Storage Gen 2 attaché à l'environnement Azure Synapse Analytics et sélectionnez ** + Nouveau dossier ** pour créer un nouveau dossier. Je vais.

スクリーンショット 2020-08-18 22.37.34.png

スクリーンショット 2020-08-18 22.40.27.png

Après avoir créé le dossier IotData, placez le fichier ʻIoTDeviceInfo.csv` dans le dossier IoTData.

スクリーンショット 2020-08-18 22.44.23.png

Paramètres de contrôle d'accès au compte de stockage (IAM)

Ajoutons un rôle dans l'onglet Contrôle d'accès (IAM).

スクリーンショット 2020-08-18 23.03.22.png

Le rôle à attribuer doit être ** Storage BLOB Data Co-Creator **. Ajoutez vos propres utilisateurs et ajoutez des rôles.

スクリーンショット 2020-08-18 23.09.50.png

Activer le lien Azure Synapse

Si vous n'activez pas Azure Synapse Link, vous ne pourrez pas créer de magasin d'analyse Azure Cosmos DB, alors activez-le. Sélectionnez le bouton ** Activer le lien Azure Synapse ** dans l'explorateur de données pour activer le lien Azure Synapse.

スクリーンショット 2020-08-18 23.16.46.png

スクリーンショット 2020-08-18 23.17.13.png

Création de bases de données et de conteneurs

Créez la base de données et le conteneur selon les informations ci-dessus. GitHub indique que lors de la création d'une base de données CosmosDBIoTDemo, ** activez le paramètre Autoscale et créez à 4000 RU / s **, mais jusqu'à ce que le traitement réel commence, ce sera 400 RU / s sans autoscale (Manuel). Il n'y a aucun problème à créer. Cela peut être changé plus tard. Les clés de partition pour le conteneur sont toutes les deux / id. Assurez-vous que ** Magasin analytique ** est sur * Activé * lorsque vous créez chaque conteneur.

Créer un service de liaison pour Azure Cosmos DB

Veuillez vous référer à l'article présenté au début pour cela. Le nom du service de liaison est «CosmosDBIoTDemo».

Une fois que vous avez créé le service lié, vous devriez pouvoir voir les informations Cosmos DB à partir de l'espace de travail Azure Synapse comme suit:

スクリーンショット 2020-08-18 23.32.39.png

01-CosmosDBSynapseStreamIngestion

Ceci est le premier cahier. Ici, nous utiliserons ** la diffusion en continu structurée pour importer des données en streaming dans la collection Azure Cosmos DB **. Comme le fichier IoTDeviceInfo.csv, le fichier .ipynb est GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/ Puisqu'il se trouve sur spark-notebooks / pyspark / 01-CosmosDBSynapseStreamIngestion.ipynb), récupérez-le et importez-le depuis ** Develop ** dans l'espace de travail Azure Synapse.

スクリーンショット 2020-08-18 23.41.17.png

Lorsque l'importation est terminée, vous devriez voir votre notebook comme indiqué ci-dessous. Assurez-vous maintenant que ** Attach to ** est défini sur le pool Spark pré-créé et que ** Language ** est défini sur PySpark.

スクリーンショット 2020-08-18 23.42.31.png

Une fois cela fait, il ne vous reste plus qu'à exécuter votre notebook. ** Si vous avez modifié les paramètres de mise à l'échelle automatique d'Azure Cosmos DB avant l'exécution **, activez la mise à l'échelle automatique **. Si vous l'exécutez à 400 RU / s sans mise à l'échelle automatique, le code d'état ** Http 429 ** sera renvoyé et la saisie de données ne fonctionnera pas. Veuillez vous référer aux explications du carnet pour le code à exécuter. Comme décrit dans Readme.md, ce bloc-notes est principalement destiné à créer des données pour l'exécution d'échantillons dans la seconde moitié. Laissons-le pendant "2 à 5 minutes" après le début de l'exécution. Cependant, ** n'inclut pas le premier lancement du pool Spark dans cette durée de 2 à 5 minutes. ** Les pools Spark prennent beaucoup de temps à s'exécuter car ils démarrent le pool la première fois que PySpark s'exécute après son inactivité.

Comme le montre la figure ci-dessous, confirmons que le traitement de la cellule 3 et de la cellule 5 est terminé et que le traitement de la cellule 7 continue de fonctionner, puis laissez-le. Ce que l'on appelle, ce ** travail négligé est un échantillon de reproduction de la réception et du stockage de données en streaming IoT **.

スクリーンショット 2020-08-18 23.53.27.png

スクリーンショット 2020-08-18 23.54.34.png

Au fil du temps, vérifions que les données ont été remplies dans le conteneur ʻIoTSignals` d'Azure Cosmos DB. ** Les paramètres de mise à l'échelle automatique peuvent être restaurés et OK **.

スクリーンショット 2020-08-19 0.12.09.png

スクリーンショット 2020-08-19 0.13.30.png

02-CosmosDBSynapseBatchIngestion

Le prochain cahier. Ici, nous utiliserons ** Azure Synapse Spark pour importer des données par lots dans la collection Azure Cosmos DB **. Comme avec 01, GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark-notebooks/ Obtenez le bloc-notes sur pyspark / 02-CosmosDBSynapseBatchIngestion.ipynb), importez-le dans l'espace de travail Azure Synapse et exécutez-le.

スクリーンショット 2020-08-19 0.16.55.png

Comme vous pouvez le voir en regardant le contenu de ʻIoTDeviceInfo.csv`, les données ne sont que 10 lignes en excluant la partie d'en-tête. Vous n'avez donc pas besoin d'annuler à nouveau vos paramètres de mise à l'échelle automatique Azure Cosmos DB **.

Une fois le bloc-notes exécuté, vérifions que les données ont été renseignées dans le conteneur ʻIoTDeviceInfo` d'Azure Cosmos DB. Il devrait y avoir 10 lignes de données correctement.

スクリーンショット 2020-08-19 0.28.52.png

03-CosmosDBSynapseJoins

Le prochain cahier. Ici, nous utilisons ** Azure Synapse Link pour effectuer des jointures et des agrégats dans les collections Azure Cosmos DB **. Tout d'abord, comme auparavant, GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark-notebooks/pyspark / 03-CosmosDBSynapseJoins.ipynb) Récupérez le bloc-notes, importez-le dans l'espace de travail Azure Synapse et exécutez-le.

Je vais vous expliquer un peu ici. Auparavant, un bloc-notes importé dans un espace de travail Azure Synapse exécutait PySpark comme indiqué ci-dessous et ** écrivait dans le magasin de transactions Azure Cosmos DB **.

streamQuery = dfIoTSignals\
                    .writeStream\
                    .format("cosmos.oltp")\
                    .outputMode("append")\
                    .option("spark.cosmos.connection.mode", "gateway") \
                    .option("spark.synapse.linkedService", "CosmosDBIoTDemo")\
                    .option("spark.cosmos.container", "IoTSignals")\
                    .option("checkpointLocation", "/writeCheckpointDir")\
                    .start()

Ce à quoi nous voulons prêter attention ici est la partie .format (" cosmos.oltp "). cosmos.oltp vous permet de vous connecter au magasin de transactions Azure Cosmos DB. (Il a également été mentionné dans la partie «Le saviez-vous?» Du cahier) De là, nous accèderons au magasin d'analyse Azure Cosmos DB. Par conséquent, cosmos.oltp ne peut pas être utilisé. À la place, nous utiliserons «cosmos.olap».

Ce bloc-notes crée également une * Spark table qui est associée à une collection dans le magasin d'analyse Azure Cosmos DB. Cette table Spark est la ** table de métadonnées **, qui a pour rôle de ** transmettre la requête à la collection de magasins d'analyse Azure Cosmos DB correspondante ** lorsque la requête Spark SQL est exécutée.

%%sql
create database CosmosDBIoTDemo
%%sql
create table if not exists CosmosDBIoTDemo.IoTSignals
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDBIoTDemo',
        spark.cosmos.container 'IoTSignals')
%%sql
create table if not exists CosmosDBIoTDemo.IoTDeviceInfo
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDBIoTDemo',
        spark.cosmos.container 'IoTDeviceInfo')
df_RPM_details = spark.sql("select a.deviceid \
                                 , b.devicetype \
                                 , cast(b.location as string) as location\
                                 , cast(b.latitude as float) as latitude\
                                 , cast(b.longitude as float) as  longitude\
                                 , a.measuretype \
                                 , a.unitSymbol \
                                 , cast(sum(measureValue) as float) as measureValueSum \
                                 , count(*) as count \
                            from CosmosDBIoTDemo.IoTSignals a \
                            left join CosmosDBIoTDemo.IoTDeviceInfo b \
                            on a.deviceid = b.deviceid \
                            where a.unitSymbol = 'RPM' \
                            group by a.deviceid, b.devicetype, b.location, b.latitude, b.longitude, a.measuretype, a.unitSymbol")

Notez que les paramètres de mise à l'échelle automatique d'Azure Cosmos DB ne sont pas pertinents pour l'accès à Azure Cosmos DB Analysis Store. La mise à l'échelle automatique concerne le magasin de transactions et «l'accès au magasin de transactions et au magasin d'analyse est distinct». Encore une fois ** vous n'avez pas besoin de modifier les paramètres de mise à l'échelle automatique d'Azure Cosmos DB **.

Une fois l'exécution terminée, vous devriez pouvoir voir les données référencées à partir de la collection dans Azure Cosmos DB Analysis Store et ce que vous avez tracé sur la carte à l'aide de ces données.

スクリーンショット 2020-08-19 0.55.38.png

スクリーンショット 2020-08-19 0.56.44.png

04-CosmosDBSynapseML

Maintenant, pour le dernier cahier. En 03, je viens de prendre les données du signal de la turbine à vapeur de la centrale électrique reçue et de les tracer sur la carte. Cela seul peut être une grande réussite, mais je voudrais faire un pas de plus vers l'utilisation du Big Data. Donc, enfin, nous allons faire ** Synapse Spark (MMLSpark) avec Azure Synapse Link et Azure Cognitive Services pour effectuer la détection des anomalies **. Le bloc-notes sera le même qu'avant GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark -notebooks / pyspark / 04-CosmosDBSynapseML.ipynb) Veuillez l'importer.

L'apprentissage automatique augmentera le volume d'articles lorsqu'il sera touché, donc je ne le décrirai pas ici, mais ici, [MMLSpark (Azure Cognitive Services on Spark)](https://mmlspark.blob.core.windows.net/website/ index.html) est utilisé. ** Vous devez disposer d'un compte API Cognitive Services pour exécuter ce notebook car il utilise l'API Anomaly Detector, qui fait partie de Cognitive Services. ** _ (Disons-le lors de la construction du premier environnement ...) _

Créons-en un nouveau à partir du portail Azure. Le détecteur d'anomalies peut également être recherché et créé sur le portail, mais à partir du 18/08/2020, veuillez le créer avec Cognitive Services.

スクリーンショット 2020-08-19 1.15.05.png

Dans la cellule 8, il y a une partie pour entrer la clé de votre compte API Cognitive Services. Remplacez la partie coller-votre-clé-ici de la .setSubscriptionKey (" coller-votre-clé-ici ") par votre propre clé.

from pyspark.sql.functions import col
from pyspark.sql.types import *
from mmlspark.cognitive import SimpleDetectAnomalies
from mmlspark.core.spark import FluentAPI

anomaly_detector = (SimpleDetectAnomalies()
                            .setSubscriptionKey("paste-your-key-here")
                            .setUrl("https://westus2.api.cognitive.microsoft.com/anomalydetector/v1.0/timeseries/entire/detect")
                            .setOutputCol("anomalies")
                            .setGroupbyCol("grouping")
                            .setSensitivity(95)
                            .setGranularity("secondly"))

Vous pouvez trouver la clé sur le portail Azure.

スクリーンショット 2020-08-19 1.23.43.png

Après avoir remplacé les clés, exécutez votre ordinateur portable pour voir les résultats.

スクリーンショット 2020-08-19 1.33.33.png

Malheureusement, mon environnement manquait de données et il ne ressemblait pas à l'image d'exemple de mon cahier. Il semble que le contenu était biaisé, probablement en raison du manque de données. ** S'il vous plaît laissez-moi savoir dans les commentaires si vous pouvez exécuter cette démo dans votre environnement et reproduire le diagramme comme indiqué dans l'image d'exemple! ** _ (je voudrais poser des questions sur la quantité de données dans le conteneur Azure Cosmos DB, etc.) _

Nettoyage environnemental

Lorsque vous avez terminé la démonstration, ** supprimez les environnements inutiles pour éviter la facturation **.

Cela dit, Azure Cosmos DB a l'option Free Tier activée pour les environnements de démonstration uniquement, et l'espace de travail Azure Synapse arrêtera de calculer la facturation une fois que le pool Spark sera inactif. Si vous n'utilisez pas l'espace de travail Azure Synapse lui-même, vous serez facturé quelques centaines de yens par mois. Si vous ne disposez d'aucun autre compte ou base de données Azure Cosmos DB, vous souhaiterez peut-être les laisser pour une étude ultérieure. C'est à vous de décider quoi faire. Cependant, je ne sais pas quand le système de facturation changera, donc si vous ne voulez pas recevoir une facture soudainement élevée, supprimez l'environnement correctement.

Des informations de référence pour le travail de nettoyage se trouvent dans l'article suivant, qui apparaîtra pour la troisième fois dans cet article.

à la fin

Le référentiel GitHub présenté et expliqué cette fois contient une autre démonstration d'Azure Synapse Link pour Azure Cosmos DB. Si vous êtes intéressé, essayez ceci également.


Recommended Posts

Azure Cosmos DB à partir de 2020 - Démonstration d'Azure Synapse Link (partie 1)
Accéder à Azure Cosmos DB avec Spring Boot