[PYTHON] Azure Cosmos DB starting in 2020 --Azure Synapse Link Demo (Part. 1)

Introduction

This article is a sample explanation article of Azure Synapse Link for Azure Cosmos DB published on GitHub GitHub. The description of the head family is in English. [Here] The Japanese translation is available in the GitHub-ymasaoka repository, so please access whichever you like.

--Honke (English): [Azure-Samples / Synapse] GitHub --Japanese translation (unofficial): [ymasaoka / Synapse] GitHub-ymasaoka

The sample of Azure Synapse Link for Azure Cosmos DB is [/ Notebooks / PySpark / Synapse Link for Cosmos DB samples /](https://github.com/ymasaoka/Synapse/tree/ja-jp/Notebooks/PySpark/ It is available in Synapse% 20Link% 20for% 20Cosmos% 20DB% 20samples). If you would like to check the details, please also check here.

Environment

The following environment is required to run this sample.

--Azure Cosmos DB account (SQL API) --Azure Synapse workspace with Spark pool

Please refer to the following articles for how to create an environment.

-[Try Azure Synapse Link] myqiita01

Scenario 1-Internet of Things (IoT)

This time, I used Azure Synapse Spark to bring streaming and batch IoT data into Azure Cosmos DB, used Azure Synapse Link to perform joins and aggregations, and used Azure Cognitive Services on Spark (MMLSpark). Anomaly Detector] Until you execute [Anomaly Detector]. [Anomaly Detector] [Anomaly Detector] is an Azure service (preview as of August 18, 2020) that allows you to easily incorporate anomaly detection into your app.

This flow

Write IoT streaming data and batch data to the Cosmos DB transaction store using the Azure Synapse Analytics Spark pool. After that, the data automatically synchronized from the transaction store to the analysis store is referenced by Azure Synapse Link for Azure Cosmos DB, and the data is combined and aggregated on Azure Synapse Analytics.

dataflow.PNG

Advance preparation

[/ Notebooks / PySpark / Synapse Link for Cosmos DB samples / IoT / README.md](https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos% Prepare and set the data according to the contents of 20DB% 20samples / IoT / README.md).

There are five things to do here:

--Upload IoTDeviceInfo.csv file --Access control (IAM) settings for the storage account associated with the Synapse workspace --Enable Azure Synapse Link in Data Explorer --Create databases and containers with your Azure Cosmos DB account - CosmosDBIoTDemo --IoTSignals (Analysis store is valid) --IoTDeviceInfo (Analysis store is valid) --Create a linked service for Azure Cosmos DB in the Azure Synapse workspace

Upload IoTDeviceInfo.csv file

ʻIoTDeviceInfo.csv` file is [/ Notebooks / PySpark / Synapse Link for Cosmos DB samples / IoT / IoTData](https://github.com/ymasaoka/Synapse/tree/ja-jp/Notebooks/PySpark/Synapse% 20Link% 20for% 20Cosmos% 20DB% 20samples / IoT / IoTData) It is below. Get the file and upload it to Azure Data Lake Storage from the Azure Synapse workspace screen.

First, create a ** IoTData ** folder. Select the ** Data / Linked ** tab, select the Azure Data Lake Storage Gen 2 environment attached to the Azure Synapse Analytics environment, and select ** + New Folder ** to create a new folder. I will.

スクリーンショット 2020-08-18 22.37.34.png

スクリーンショット 2020-08-18 22.40.27.png

After creating the IotData folder, place the ʻIoTDeviceInfo.csv` file inside the IoTData folder.

スクリーンショット 2020-08-18 22.44.23.png

Storage account access control (IAM) settings

Let's add a role on the Access Control (IAM) tab.

スクリーンショット 2020-08-18 23.03.22.png

The role to assign should be ** Storage Blob Data Co-Creator **. Add your own users and add roles.

スクリーンショット 2020-08-18 23.09.50.png

Enable Azure Synapse Link

If you don't enable Azure Synapse Link, you won't be able to create an Azure Cosmos DB analytics store, so enable it. Select the ** Enable Azure Synapse Link ** button on the Data Explorer to enable Azure Synapse Link.

スクリーンショット 2020-08-18 23.16.46.png

スクリーンショット 2020-08-18 23.17.13.png

Creating databases and containers

Create the database and container according to the information above. GitHub says that when creating the CosmosDBIoTDemo database, ** enable the Autoscale setting and create at 4000 RU / s **, but until the actual processing starts, it will be 400 RU / s without autoscale (Manual). There is no problem in creating. This can be changed later. The partition keys for the container are both / id. Make sure that ** Analytical store ** is * On * when you create each container.

Create a linked service for Azure Cosmos DB

Please refer to the article introduced at the beginning for this. The link service name is CosmosDBIoTDemo.

-[Try Azure Synapse Link] myqiita01

Once you've created your linked service, you should be able to see your Cosmos DB information from your Azure Synapse workspace, as shown below.

スクリーンショット 2020-08-18 23.32.39.png

01-CosmosDBSynapseStreamIngestion

This is the first notebook. Here, we will use ** structured streaming to bring streaming data into the Azure Cosmos DB collection **. The .ipynb file is the same as the IoTDeviceInfo.csv file GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/ Since it is on spark-notebooks / pyspark / 01-CosmosDBSynapseStreamIngestion.ipynb), please get it and import it from ** Develop ** in the Azure Synapse workspace.

スクリーンショット 2020-08-18 23.41.17.png

When the import is complete, you should see your notebook as shown below. Now make sure that ** Attach to ** is set to the pre-created Spark pool and ** Language ** is set to PySpark.

スクリーンショット 2020-08-18 23.42.31.png

Once you've done that, all you have to do is run your notebook. ** If you have changed the Azure Cosmos DB autoscale settings before running, enable autoscale **. If you do not set the autoscale and execute it at 400 RU / s, the status code ** Http 429 ** will be returned and the data input will not be successful. Please refer to the explanation in the notebook for the code to be executed. As described in Readme.md, this notebook mainly creates data for sample execution in the latter half. Let's leave it for 2 to 5 minutes after the execution starts. However, ** does not include the first launch of the Spark pool in this 2-5 minute time. ** Spark pools take a long time to run because they start the pool the first time PySpark runs after it goes idle.

As shown in the figure below, let's confirm that the processing of Cell 3 and Cell 5 is completed and the processing of Cell 7 continues to work, and then leave it. So-called ** neglected work is a reproduction sample of receiving and saving IoT streaming data **.

スクリーンショット 2020-08-18 23.53.27.png

スクリーンショット 2020-08-18 23.54.34.png

Over time, let's verify that the data has been populated in the Azure Cosmos DB's ʻIoTSignals` container. ** The autoscale settings can be restored and OK **.

スクリーンショット 2020-08-19 0.12.09.png

スクリーンショット 2020-08-19 0.13.30.png

02-CosmosDBSynapseBatchIngestion

The next notebook. Here we will use ** Azure Synapse Spark to bring batch data into an Azure Cosmos DB collection **. As with 01, GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark-notebooks/ Get the notebook on pyspark / 02-CosmosDBSynapseBatchIngestion.ipynb), import it into your Azure Synapse workspace, and run it.

スクリーンショット 2020-08-19 0.16.55.png

As you can see by looking at the contents of ʻIoTDeviceInfo.csv`, the data is only 10 lines excluding the header part. So you don't need to undo your Azure Cosmos DB autoscale settings here again **.

After the notebook has finished running, let's verify that the data has been populated in the ʻIoTDeviceInfo` container in Azure Cosmos DB. There should be 10 rows of data properly.

スクリーンショット 2020-08-19 0.28.52.png

03-CosmosDBSynapseJoins

The next notebook. Here we use ** Azure Synapse Link to perform joins and aggregates across Azure Cosmos DB collections **. First of all, as before, GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark-notebooks/pyspark / 03-CosmosDBSynapseJoins.ipynb) Get the notebook, import it into the Azure Synapse workspace, and run it.

I will explain a little here. Previously, a notebook imported into an Azure Synapse workspace would run PySpark as shown below and ** write to the Azure Cosmos DB transaction store **.

streamQuery = dfIoTSignals\
                    .writeStream\
                    .format("cosmos.oltp")\
                    .outputMode("append")\
                    .option("spark.cosmos.connection.mode", "gateway") \
                    .option("spark.synapse.linkedService", "CosmosDBIoTDemo")\
                    .option("spark.cosmos.container", "IoTSignals")\
                    .option("checkpointLocation", "/writeCheckpointDir")\
                    .start()

What we want to focus on here is the .format ("cosmos.oltp") part. cosmos.oltp allows you to connect to the Azure Cosmos DB transaction store. (It was also mentioned in the "Did you know?" Part of the notebook) From here, we'll access the Azure Cosmos DB analytics store. Therefore, cosmos.oltp cannot be used. Instead, use cosmos.olap.

This notebook also creates a * Spark table that is associated with a collection in the Azure Cosmos DB analytics store. This Spark table is the so-called ** metadata table **, which has the role of ** passing the query to the corresponding collection of Azure Cosmos DB analytics stores ** when the Spark SQL query is executed.

%%sql
create database CosmosDBIoTDemo
%%sql
create table if not exists CosmosDBIoTDemo.IoTSignals
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDBIoTDemo',
        spark.cosmos.container 'IoTSignals')
%%sql
create table if not exists CosmosDBIoTDemo.IoTDeviceInfo
using cosmos.olap
options(spark.synapse.linkedService 'CosmosDBIoTDemo',
        spark.cosmos.container 'IoTDeviceInfo')
df_RPM_details = spark.sql("select a.deviceid \
                                 , b.devicetype \
                                 , cast(b.location as string) as location\
                                 , cast(b.latitude as float) as latitude\
                                 , cast(b.longitude as float) as  longitude\
                                 , a.measuretype \
                                 , a.unitSymbol \
                                 , cast(sum(measureValue) as float) as measureValueSum \
                                 , count(*) as count \
                            from CosmosDBIoTDemo.IoTSignals a \
                            left join CosmosDBIoTDemo.IoTDeviceInfo b \
                            on a.deviceid = b.deviceid \
                            where a.unitSymbol = 'RPM' \
                            group by a.deviceid, b.devicetype, b.location, b.latitude, b.longitude, a.measuretype, a.unitSymbol")

Note that Azure Cosmos DB autoscale settings are not relevant for access to the Azure Cosmos DB analytics store. Autoscale is about transaction stores, and transaction store and analytics store access are distinct. So again ** you don't need to change the Azure Cosmos DB autoscale settings **.

When it's done, you should be able to see the data referenced from the collection in the Azure Cosmos DB analytics store, and what you plotly plotted on the map using that data.

スクリーンショット 2020-08-19 0.55.38.png

スクリーンショット 2020-08-19 0.56.44.png

04-CosmosDBSynapseML

Now for the last notebook. In 03, we just took the signal data from the steam turbine of the received power plant and plotted it on a map. This alone may be a big achievement, but I would like to take another step toward the utilization of big data. So, finally, we'll do ** Synapse Spark (MMLSpark) with Azure Synapse Link and Azure Cognitive Services to perform anomaly detection **. The notebook will be the same as before GitHub(https://github.com/ymasaoka/Synapse/blob/ja-jp/Notebooks/PySpark/Synapse%20Link%20for%20Cosmos%20DB%20samples/IoT/spark -notebooks / pyspark / 04-CosmosDBSynapseML.ipynb) It's on. Please import it.

I won't mention machine learning here because it will increase the volume of articles, but here, MMLSpark (Azure Cognitive Services on Spark) index.html) is used. ** You must have a Cognitive Services API account to run this notebook because it uses the Anomaly Detector API, which is part of Cognitive Services. ** _ (Let's say it when building the first environment ...) _

Let's create a new one from the Azure portal. Anomaly Detector can also be searched and created on the portal, but as of 08/18/2020, please create it with Cognitive Services.

スクリーンショット 2020-08-19 1.15.05.png

In Cell 8, there is a part to enter the key of your Cognitive Services API account. Replace the paste-your-key-here part of .setSubscriptionKey ("paste-your-key-here ") with your own key.

from pyspark.sql.functions import col
from pyspark.sql.types import *
from mmlspark.cognitive import SimpleDetectAnomalies
from mmlspark.core.spark import FluentAPI

anomaly_detector = (SimpleDetectAnomalies()
                            .setSubscriptionKey("paste-your-key-here")
                            .setUrl("https://westus2.api.cognitive.microsoft.com/anomalydetector/v1.0/timeseries/entire/detect")
                            .setOutputCol("anomalies")
                            .setGroupbyCol("grouping")
                            .setSensitivity(95)
                            .setGranularity("secondly"))

You can find the key on the Azure portal.

スクリーンショット 2020-08-19 1.23.43.png

After replacing the keys, run your notebook to see the results.

スクリーンショット 2020-08-19 1.33.33.png

Unfortunately, my environment was lacking in data, and it didn't look like the sample image in my notebook. It seems that the content was biased, probably due to lack of data. ** Please let me know in the comments if you can run this demo in your environment and reproduce the figure as in the sample image! ** _ (I would like to ask about the amount of data in the Azure Cosmos DB container, etc.) _

Environmental cleanup

When you're done with the demo, ** remove unnecessary environments to avoid billing **.

That said, Azure Cosmos DB has Free Tier enabled for demo environments only, and Azure Synapse workspace also stops computing billing once the Spark pool is idle. If you don't use the Azure Synapse workspace itself, you will be charged a few hundred yen a month. If you don't have any other Azure Cosmos DB accounts or databases, you might want to leave them for later study. It's up to you what to do. However, I don't know when the billing system will change, so if you don't want to receive a sudden high bill, delete the environment properly.

Reference information for cleanup work can be found in the following article, which is the third appearance in this article.

-[Try Azure Synapse Link] myqiita01

at the end

The GitHub repository we introduced and explained this time contains another demo of Azure Synapse Link for Azure Cosmos DB. If you are interested, please try this as well.


Recommended Posts

Azure Cosmos DB starting in 2020 --Azure Synapse Link Demo (Part. 1)
Tips for developing apps with Azure Cosmos DB in Python
Access Azure Cosmos DB with Spring Boot