[PYTHON] I tried to create a Power BI report by processing CSV / JSON logs using Azure Databricks

Overview introduction

Use Azure Databricks to analyze access logs and create reports. I had a small project called, so I will share the procedure. The overall overview is like this. Collect logs from Defender ATP and Office 365 Admin Center and store them in Azure Blob Storage. We use Azure Databricks to analyze, process, and integrate for viewing in Power BI reports. workinghour0.png

menu

Follow the 4 steps below. In ** Step 1 & 2 **, I did basically the same thing with Pandas DataFrame on my local PC and DataFrame on Azure Databricks, respectively, to see the difference between Pandas and Spark Dataframe. Start by reading each CSV / JSON. Try Pandas for easy aggregation of data of a handy size, and Databricks for processing large volumes of data. ** Step 3 ** writes the Dataframe data to various formats for connecting with Power BI or for keeping at hand. Finally, in ** Step 4 ** until you view it as a report from Power BI. Make sure.

-** Step1: ** Reading and processing CSV files using Python Pandas / Azure Databricks -** Step2: ** Processing JSON files using Azure Databricks -** Step3: ** Write the above processing data to CSV / Parquet / Table -** Step4: ** Connect to Databricks and create a report that you can view from Power BI

Step 1-1: Process Microsoft 365 Audit Log (CSV) with Python using Pandas

First, imagine a unit test, import a CSV file in the local environment, and process and visualize it using Python Pandas. The following entries may be helpful for the Python development environment.

Install the Python environment with Anaconda

  1. Try importing with Pandas.

    import pandas as pd
    
    df = pd.read_csv('AuditLog_2020-01-11_2020-04-11.csv')
    print(df)
    
  2. Check the column name and type type to see what information you have.

    print(df.columns)
    df.dtypes
    
    Index(['CreationDate', 'UserIds', 'Operations', 'AuditData'], dtype='object')
    
    CreationDate    object
    UserIds         object
    Operations      object
    AuditData       object
    dtype: object
    
  3. Try to display the first 5 lines.

    df.head(5)
    
    CreationDate	UserIds	Operations	AuditData
    

0 2020-04-10T21:24:57.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T21:24:57","Id":"ba..." 1 2020-04-10T20:55:58.0000000Z [email protected] FileUploaded {"CreationTime":"2020-04-10T20:55:58","Id":"80..." 2 2020-04-10T20:32:49.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T20:32:49","Id":"51..." 3 2020-04-10T20:33:39.0000000Z [email protected] FileAccessed {"CreationTime":"2020-04-10T20:33:39","Id":"c0..." 4 2020-04-10T19:32:36.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T19:32:36","Id":"28..." ```

  1. Since the data in the AuditData column is not used this time, the entire column will be deleted. You can reflect the changes in the DataFrame by adding the "inplace = True" option.

  2. df.drop("AuditData", axis=1, inplace=True)
    
  3. The date / time data is written in the CreationDate column, but it cannot be used as it is, so convert it to the date / time data type.

    df['CreationDate'] = pd.to_datetime(df['CreationDate'])
    

Before use: 2020-04-10T21:24:57.0000000Z After use: 2020-04-10 21:24:57 ```

  1. Check the data type. It has been converted to "datetime64".

    df.dtypes
    
    CreationDate    datetime64[ns]
    UserIds                 object
    Operations              object
    dtype: object
    
  2. We'll have columns with data that we might need when creating Power BI reports. You can create a measure on the Power BI side, but I thought it would improve the performance when viewing the report, so I put it in a column.

    df['Hour'] = df['CreationDate'].dt.hour
    df['Weekday_Name'] = df['CreationDate'].dt.weekday_name
    df['DayofWeek'] = df['CreationDate'].dt.dayofweek
    
  3. Finally, let's check the column name and type type.

    print(df.columns)
    df.dtypes
    
    Index(['CreationDate', 'UserIds', 'Operations', 'Hour', 'Weekday_Name', 'DayofWeek'],
      dtype='object')
    CreationDate    datetime64[ns]
    UserIds                 object
    Operations              object
    Hour                     int64
    Weekday_Name            object
    DayofWeek                int64
    dtype: object
    ```
    
  4. After confirming, write the result to a CSV file.

df.to_csv('AuditLog_2020-01-11_2020-04-11_edited.csv')

Step 1-2: Process Microsoft 365 Audit Log (CSV) with Azure Databricks

If you have a limited number of log files to analyze, you can use Pandas, but what if you want to analyze a large amount of log data that does not fit in memory at once? Try to do the same with Azure Databricks DataFrame.

  1. Create an Azure Data Lake Storage Gen2 account and upload the CSV file. Reference: See Create an Azure Data Lake Storage Gen2 account please.

  2. Load the CSV file into Azure Databricks. The Qiita entry of the team member was helpful. Reference: "Mount Data Lake Storage Gen2 from Azure Databricks"

  3. For data handling in Databricks, this Qiita entry was helpful. Reference: "Memo of the one often used when handling data with pyspark"

  4. Mount the file system.

    configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "<Service Principal Application ID>",
       "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>"),
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<AAD file system name tenant ID>/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://auditlog@<Storage account name>.dfs.core.windows.net/",
    mount_point = "/mnt/auditdata",
    extra_configs = configs)
    

If it is already mounted and an error occurs, unmount it once.

``` python:Optional

dbutils.fs.unmount("/mnt/auditdata") ```

  1. Read the CSV file. By specifying "inferschema ='true'" here, the type type is inferred and the data is stored in the Dataframe.

    Spark_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11.csv")
    
  2. Check the column name and type type to see what information you have. Spark Dataframe recognizes CreationDate as a timestamp type.

    Spark_df.printSchema()
    
    root
    |-- CreationDate: timestamp (nullable = true)
    |-- UserIds: string (nullable = true)
    |-- Operations: string (nullable = true)
    |-- AuditData: string (nullable = true)
    
  3. Try to display the first 5 lines. Specifying False for the show method removes the Truncate option and displays the entire contents of the column data.

    Spark_df.show(5, False)
    
    +-------------------+---------------------+------------+------------------------------------------+
    |CreationDate       |UserIds              |Operations  |AuditData                                 |
    +-------------------+---------------------+------------+------------------------------------------+
    |2020-04-10 21:24:57|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T21:24:57"|
    |2020-04-10 20:55:58|[email protected]|FileUploaded|"{""CreationTime"":""2020-04-10T20:55:58"|
    |2020-04-10 20:32:49|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T20:32:49"|
    |2020-04-10 20:33:39|[email protected]|FileAccessed|"{""CreationTime"":""2020-04-10T20:33:39"|
    |2020-04-10 19:32:36|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T19:32:36"|
    +-------------------+---------------------+------------+------------------------------------------+
    only showing top 5 rows
    
  4. As before, we'll exclude the AuditData column and have the columns with data that we might need when creating a Power BI report.

    from pyspark.sql.functions import concat, date_format, col, lit
    Spark_df = Spark_df.select('CreationDate', 'UserIds', 'Operations', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
    
    Spark_df = Spark_df.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
    Spark_df.show()
    
    +-------------------+--------------------+-------------------+----+---------+------------+--------+
    |       CreationDate|             UserIds|         Operations|Hour|DayofWeek|Weekday_Name|Day_Weekday|
    +-------------------+--------------------+-------------------+----+---------+------------+--------+
    |2020-04-10 21:24:57|abc@contoso...|       UserLoggedIn|  21|        5|         Fri|   5_Fri|
    |2020-04-10 20:55:58|abc@contoso...|       FileUploaded|  20|        5|         Fri|   5_Fri|
    |2020-04-10 20:32:49|abc@contoso...|       UserLoggedIn|  20|        5|         Fri|   5_Fri|
    |2020-04-10 20:33:39|abc@contoso...|       FileAccessed|  20|        5|         Fri|   5_Fri|
    |2020-04-10 19:32:36|abc@contoso...|       UserLoggedIn|  19|        5|         Fri|   5_Fri|
    

Step 2: Collect information from Defender ATP and process it in Azure Databricks Spark

There is a solution called Microsoft Defender Advanced Threat Protection (DATP) that can avoid, detect, investigate, and respond to various threats that threaten the enterprise environment, but with the function called Advanced Hunting, Microsoft Defender Security Center can be used. You can search the stored data for up to 30 days under various conditions and use it for analysis.

This time, let's collect Security Center information from Databricks using REST API and process it in the same way as Step 1.

  1. To call the Advanced Hunting API from Python, first get an access token.

    import json
    import urllib.request
    import urllib.parse
    
    tenantId = '00000000-0000-0000-0000-000000000000' # Paste your own tenant ID here
    appId = '11111111-1111-1111-1111-111111111111' # Paste your own app ID here
    appSecret = '22222222-2222-2222-2222-222222222222' # Paste your own app secret here
    
    url = "https://login.windows.net/%s/oauth2/token" % (tenantId)
    
    resourceAppIdUri = 'https://api.securitycenter.windows.com'
    
    body = {
        'resource' : resourceAppIdUri,
        'client_id' : appId,
        'client_secret' : appSecret,
        'grant_type' : 'client_credentials'
    }
    
    data = urllib.parse.urlencode(body).encode("utf-8")
    
    req = urllib.request.Request(url, data)
    response = urllib.request.urlopen(req)
    jsonResponse = json.loads(response.read())
    aadToken = jsonResponse["access_token"]
    
  2. Run a Kusto query to get the information. This time we will collect logs when a particular process initiates an event involving a network connection. You can track user processes and analyze activity.

    query = 'DeviceNetworkEvents​' # Paste your own query here
    
    url = "https://api.securitycenter.windows.com/api/advancedqueries/run"
    headers = { 
    'Content-Type' : 'application/json',
    'Accept' : 'application/json',
    'Authorization' : "Bearer " + aadToken
    }
    
    data = json.dumps({ 'Query' : query }).encode("utf-8")
    
    req = urllib.request.Request(url, data, headers)
    response = urllib.request.urlopen(req)
    jsonResponse = json.loads(response.read())
    schema = jsonResponse["Schema"]
    results = jsonResponse["Results"]
    
  3. Store the information obtained from the Advanced Hunting API in Spark Dataframe.

    rddData = sc.parallelize(results)
    Spark_df2 = spark.read.json(rddData)
    
  4. Check the column name and type type to see what information you have. The date and time information is stored in Timestamp, but this time it was not recognized as a timestamp type.

    Spark_df2.printSchema()
    
    root
     |-- ActionType: string (nullable = true)
     |-- AppGuardContainerId: string (nullable = true)
     |-- DeviceId: string (nullable = true)
     |-- DeviceName: string (nullable = true)
     |-- InitiatingProcessAccountDomain: string (nullable = true)
     |-- InitiatingProcessAccountName: string (nullable = true)
     |-- InitiatingProcessAccountObjectId: string (nullable = true)
     |-- InitiatingProcessAccountSid: string (nullable = true)
     |-- InitiatingProcessAccountUpn: string (nullable = true)
     |-- InitiatingProcessCommandLine: string (nullable = true)
     |-- InitiatingProcessCreationTime: string (nullable = true)
     |-- InitiatingProcessFileName: string (nullable = true)
     |-- InitiatingProcessFolderPath: string (nullable = true)
     |-- InitiatingProcessId: long (nullable = true)
     |-- InitiatingProcessIntegrityLevel: string (nullable = true)
     |-- InitiatingProcessMD5: string (nullable = true)
     |-- InitiatingProcessParentCreationTime: string (nullable = true)
     |-- InitiatingProcessParentFileName: string (nullable = true)
     |-- InitiatingProcessParentId: long (nullable = true)
     |-- InitiatingProcessSHA1: string (nullable = true)
     |-- InitiatingProcessSHA256: string (nullable = true)
     |-- InitiatingProcessTokenElevation: string (nullable = true)
     |-- LocalIP: string (nullable = true)
     |-- LocalIPType: string (nullable = true)
     |-- LocalPort: long (nullable = true)
     |-- Protocol: string (nullable = true)
     |-- RemoteIP: string (nullable = true)
     |-- RemoteIPType: string (nullable = true)
     |-- RemotePort: long (nullable = true)
     |-- RemoteUrl: string (nullable = true)
     |-- ReportId: long (nullable = true)
     |-- Timestamp: string (nullable = true)
     |-- _corrupt_record: string (nullable = true)
    
  5. Use "InitiatingProcessFileName" to check the statistics for each process.

    Spark_df2.groupBy("InitiatingProcessFileName").count().sort("count", ascending=False).show()
    
    +-------------------------+-----+
    |InitiatingProcessFileName|count|
    +-------------------------+-----+
    |              svchost.exe|10285|
    |              MsSense.exe| 2179|
    |               chrome.exe| 1693|
    |     OfficeClickToRun.exe| 1118|
    |             OneDrive.exe|  914|
    |             AvastSvc.exe|  764|
    |     backgroundTaskHos...|  525|
    |      MicrosoftEdgeCP.exe|  351|
    
  6. Convert the data type of the "Timestamp" column to the Timestamp type and save it together with Step 1 with the column name "CreationDate".

    from pyspark.sql.types import TimestampType
    
    Spark_df2 = Spark_df2.withColumn("CreationDate", Spark_df2["Timestamp"].cast(TimestampType()))
    Spark_df2.printSchema()
    
  7. As before, we'll exclude unnecessary columns and have columns with data that might be needed when creating a Power BI report.

    from pyspark.sql.functions import concat, date_format, col, lit
    
    Spark_df2 = Spark_df2.select('CreationDate', 'DeviceId', 'DeviceName', 'InitiatingProcessFileName', 'InitiatingProcessAccountName', 'RemoteUrl', 'RemoteIP', 'LocalIP',  date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
    
    Spark_df2 = Spark_df2.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
    Spark_df2.show()
    
  8. Check the column name and type type. It was refreshing.

    Spark_df2.printSchema()
    
    root
     |-- CreationDate: timestamp (nullable = true)
     |-- DeviceId: string (nullable = true)
     |-- DeviceName: string (nullable = true)
     |-- InitiatingProcessFileName: string (nullable = true)
     |-- InitiatingProcessAccountName: string (nullable = true)
     |-- RemoteUrl: string (nullable = true)
     |-- RemoteIP: string (nullable = true)
     |-- LocalIP: string (nullable = true)
     |-- Hour: string (nullable = true)
     |-- DayofWeek: string (nullable = true)
     |-- Weekday_Name: string (nullable = true)
     |-- Day_Weekday: string (nullable = true)
    

Step 3: Write the above processing data to CSV / Parquet / Table

Now that it looks good, let's write the data processed in Step 1 and Step 2 in various formats.

1. Handling CSV in Databricks (Databricks Documentation CSV files)

  1. Since the data of Step 1 is created in Spark_df and the data of Step 2 is created in Spark_df2, let's write it to the CSV file. You can combine the output files with coalesce (1). If you need Header information, set it to "true" as an option.

    Spark_df.coalesce(1).write.option("header", "true").csv("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11_edited.csv")
    
  2. Make sure the CSV file is created in the Azure Data Lake Storage Gen2 storage account mounted on Databricks. When I download it, it seems that the CSV file is stored directly under the folder with the specified file name. workinghour10.png

workinghour11.png

(Reference) CSV reading is as follows

``` python:Input
#Spark Dataframe
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/Spark_df.csv")
display (Spark_df)

#pandas
import pandas as pd
pd_dataframe = pd.read_csv('/dbfs/mnt/auditdata/Spark_df.csv')
```

2. Handling Parquet in Databricks (Databricks Documentation Parquet files)

  1. Try writing in Parquet format as well.

    Spark_df.write.mode("append").parquet("/mnt/auditdata/parquet/audit")
    

(Reference) The reading of Parquet is as follows

``` python:Input
#Python
data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)

#Scala
%scala
val data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)

#SQL
%sql
CREATE TEMPORARY TABLE scalaTable
USING parquet
OPTIONS (
  path "/mnt/auditdata/parquet/audit"
)

SELECT * FROM scalaTable
```

3. Handling Tables in Databricks (Databricks Documentation Tables)

  1. Try writing in Databricks Table format as well.

    Spark_df.write.saveAsTable("worktime")
    
    worktime = spark.sql("select * from worktime")
    display(worktime.select("*"))
    

Step 4: Connect to Databricks and create a report that you can view from Power BI

Finally, let's create a report that can be viewed in Power BI using the data so far.

  1. Start Databricks Workspace from the Azure portal, display "Cluster" from the left panel, and select the cluster running the Table to connect to.

  2. In the cluster settings panel, select Advanced Options to display the JDBC / ODBC menu.

  3. The settings screen contains the following information.

Use this information to get the connection destination setting string.

https://<Hostname>:<Port>/<HTTP Path>

Specifically, it should be a string like the one below.

Server : https://xxx-xxx.1.azuredatabricks.net:443/sql/protocolv1/o/687887143797341/xxxx-xxxxxx-xxxxxxxx
  1. On the Databrick workspace management screen, click the user profile icon in the upper right and click User Settings.

  2. Click the "Access Tokens" tab and click the "Generate New Token" button. workinghour18.png

  3. On the "Generate New Token" screen, write "Power BI" in the "Comment" field. It's an option so you don't have to write it.

  4. Click the "Generate" button and copy and save the created token.

  5. Launch Power BI Desktop and select "Spark" as the destination data source from "Get Data".

  6. In the Spark connection settings, paste the connection destination setting character string you obtained earlier into the "Server" field. Select "HTTP" as the protocol and "Direct Query" as the connection mode, and click the "OK" button. workinghour16.png

  7. In the Spark connection settings, enter "token" in the "User name" field and paste the Password you obtained earlier. Click the "Connect" button. workinghour24.png

  8. The list of tables created in Step 3 will be displayed. Select the table required for the Power BI report and click the "Load" button. workinghour25.png

  9. Using the data prepared in Steps 1 to 3, I finally made a report like this in Power BI Desktop. workinghour26.png

Summary

This time, I tried to proceed with log analysis and visualization using Databricks. It feels like we're only leveraging some of Databricks' potential. In reality, it must be able to demonstrate its true potential in situations where distributed processing is required for a Data Lake that has accumulated a large amount of data.

Still, it is a versatile processing platform that can be used in any language such as Scala, Python, R, SQL, stream processing, machine learning, visualization, and it is also wonderful that it can be linked with various Azure services including Power BI. I felt that.

We recommend Azure Databricks with confidence to anyone who has data but is wondering how to use it or has problems with data processing.

bonus

I was also interested in linking with Azure SQL database and Cosmos DB, so I will try it next time.

Recommended Posts

I tried to create a Power BI report by processing CSV / JSON logs using Azure Databricks
I tried to automatically create a report with Markov chain
I tried to create a simple credit score by logistic regression.
I tried to create a linebot (implementation)
I tried to create a linebot (preparation)
I tried to develop a Formatter that outputs Python logs in JSON
I tried to create CSV upload, data processing, download function with Django
[Outlook] I tried to automatically create a daily report email with Python
I tried to create a sample to access Salesforce using Python and Bottle
I tried to make a ○ ✕ game using TensorFlow
I tried to create a class that can easily serialize Json in Python
[Azure] I tried to create a Linux virtual machine in Azure of Microsoft Learn
I tried to create a table only with Django
[Python] I tried to automatically create a daily report of YWT with Outlook mail
Python script to create a JSON file from a CSV file
I tried to draw a configuration diagram using Diagrams
I tried to create a RESTful API by connecting the explosive Python framework FastAPI to MySQL.
I want to create a karaoke sound source by separating instruments and vocals using Python
I tried to speed up video creation by parallel processing
I tried to automate [a certain task] using Raspberry Pi
I tried to create a bot for PES event notification
I tried to make a stopwatch using tkinter in python
I tried to make a simple text editor using PyQt
I tried to easily create a high-precision 3D image with one photo [2]. (Try processing depth with numpy)
I tried to create a server environment that runs on Windows 10
I tried to get a database of horse racing using Pandas
I tried to make a regular expression of "amount" using Python
I tried to make a regular expression of "time" using Python
I tried to implement anomaly detection using a hidden Markov model
I tried to create a list of prime numbers with python
I tried to make a regular expression of "date" using Python
I tried to create Bulls and Cows with a shell program
I tried to make a todo application using bottle with python
How to create a CSV dummy file containing Japanese using Faker
I tried to create Quip API
A super introduction to Django by Python beginners! Part 3 I tried using the template file inheritance function
A super introduction to Django by Python beginners! Part 2 I tried using the convenient functions of the template
I tried asynchronous processing using asyncio
I tried to get the batting results of Hachinai using image processing
I tried to communicate with a remote server by Socket communication with Python.
I tried to create a program to convert hexadecimal numbers to decimal numbers with python
I tried to create a plug-in with HULFT IoT Edge Streaming [Development] (2/3)
I tried to create a plug-in with HULFT IoT Edge Streaming [Execution] (3/3)
I tried to create a plug-in with HULFT IoT Edge Streaming [Setup] (1/3)
I tried to perform a cluster analysis of customers using purchasing data
I tried to verify the result of A / B test by chi-square test
I tried to analyze the New Year's card by myself using python
I tried to make PyTorch model API in Azure environment using TorchServe
I tried playing a ○ ✕ game using TensorFlow
I tried drawing a line using turtle
I tried to classify text using TensorFlow
I tried to make a Web API
I tried using pipenv, so a memo
I tried to predict Covid-19 using Darts
Go beginner tried to create a cloud native web application using Datastore / GAE
What I learned by implementing how to create a Default Box for SSD
Aggregate steps by day from iPhone healthcare data to create a CSV file
I tried to easily create a fully automatic attendance system with Selenium + Python
I tried to make a translation BOT that works on Discord using googletrans
I tried to create a button for Slack with Raspberry Pi + Tact Switch
I tried to create an environment to check regularly using Selenium with AWS Fargate