Verwenden Sie Azure Databricks, um Zugriffsprotokolle zu analysieren und Berichte zu erstellen. Ich hatte ein kleines Projekt namens, also werde ich das Verfahren teilen. Die Gesamtübersicht ist so. Sammeln Sie Protokolle von Defender ATP und Office 365 Admin Center und speichern Sie sie im Azure Blob-Speicher. Wir verwenden Azure Databricks zum Analysieren, Verarbeiten und Integrieren für die Anzeige in Power BI-Berichten.
Befolgen Sie die folgenden 4 Schritte. In ** Schritt 1 und 2 ** habe ich im Grunde dasselbe mit Pandas DataFrame auf dem lokalen PC und DataFrame auf Azure Databricks versucht, um den Unterschied zwischen Pandas und Spark Dataframe festzustellen. Beginnen Sie mit dem Lesen jeder CSV / JSON. Probieren Sie Pandas für die einfache Aggregation von Daten in handlicher Größe und Databricks für die Verarbeitung großer Datenmengen. ** Schritt 3 ** schreibt die Dataframe-Daten in verschiedene Formate, um eine Verbindung mit Power BI herzustellen oder um sie griffbereit zu halten. Schließlich in ** Schritt 4 ** von Power BI zur Anzeige als Bericht. Versicher dich.
Stellen Sie sich zunächst einen Komponententest vor, importieren Sie eine CSV-Datei in die lokale Umgebung und verarbeiten und visualisieren Sie sie mit Python Pandas. Die folgenden Einträge können für die Python-Entwicklungsumgebung hilfreich sein.
Python-Umgebung mit Anaconda installieren
Versuchen Sie, mit Pandas zu erfassen.
import pandas as pd
df = pd.read_csv('AuditLog_2020-01-11_2020-04-11.csv')
print(df)
Überprüfen Sie den Spaltennamen und den Typ, um festzustellen, über welche Informationen Sie verfügen.
print(df.columns)
df.dtypes
Index(['CreationDate', 'UserIds', 'Operations', 'AuditData'], dtype='object')
CreationDate object
UserIds object
Operations object
AuditData object
dtype: object
Versuchen Sie, die ersten 5 Zeilen anzuzeigen.
df.head(5)
CreationDate UserIds Operations AuditData
0 2020-04-10T21:24:57.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T21:24:57","Id":"ba..." 1 2020-04-10T20:55:58.0000000Z [email protected] FileUploaded {"CreationTime":"2020-04-10T20:55:58","Id":"80..." 2 2020-04-10T20:32:49.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T20:32:49","Id":"51..." 3 2020-04-10T20:33:39.0000000Z [email protected] FileAccessed {"CreationTime":"2020-04-10T20:33:39","Id":"c0..." 4 2020-04-10T19:32:36.0000000Z [email protected] UserLoggedIn {"CreationTime":"2020-04-10T19:32:36","Id":"28..." ```
Da die Daten in der AuditData-Spalte dieses Mal nicht verwendet werden, wird die gesamte Spalte gelöscht. Sie können die Änderungen im DataFrame widerspiegeln, indem Sie die Option "inplace = True" hinzufügen.
df.drop("AuditData", axis=1, inplace=True)
Datums- / Uhrzeitdaten werden in die Spalte "CreationDate" geschrieben, können jedoch nicht so verwendet werden, wie sie sind. Konvertieren Sie sie daher in den Datentyp "Datum / Uhrzeit".
df['CreationDate'] = pd.to_datetime(df['CreationDate'])
Vor dem Gebrauch: 2020-04-10T21:24:57.0000000Z Nach Gebrauch: 2020-04-10 21:24:57 ```
Überprüfen Sie den Datentyp. Es wurde in "datetime64" konvertiert.
df.dtypes
CreationDate datetime64[ns]
UserIds object
Operations object
dtype: object
Wir haben Spalten mit Daten, die wir möglicherweise beim Erstellen von Power BI-Berichten benötigen. Es ist möglich, eine Kennzahl auf der Power BI-Seite zu erstellen, aber ich dachte, dass dies die Leistung beim Anzeigen des Berichts verbessern würde, und habe sie daher in eine Spalte eingefügt.
df['Hour'] = df['CreationDate'].dt.hour
df['Weekday_Name'] = df['CreationDate'].dt.weekday_name
df['DayofWeek'] = df['CreationDate'].dt.dayofweek
Überprüfen Sie abschließend den Spaltennamen und den Typ.
print(df.columns)
df.dtypes
Index(['CreationDate', 'UserIds', 'Operations', 'Hour', 'Weekday_Name', 'DayofWeek'],
dtype='object')
CreationDate datetime64[ns]
UserIds object
Operations object
Hour int64
Weekday_Name object
DayofWeek int64
dtype: object
```
Schreiben Sie das Ergebnis nach Bestätigung in eine CSV-Datei.
df.to_csv('AuditLog_2020-01-11_2020-04-11_edited.csv')
Wenn Sie nur eine begrenzte Anzahl von Protokolldateien analysieren müssen, ist Pandas in Ordnung. Was ist jedoch, wenn Sie eine große Menge von Protokolldaten analysieren möchten, die nicht gleichzeitig in den Speicher passen? Versuchen Sie dasselbe mit Azure Databricks DataFrame.
Erstellen Sie ein Azure Data Lake Storage Gen2-Konto und laden Sie die CSV-Datei hoch. Referenz: Siehe Erstellen eines Azure Data Lake Storage Gen2-Kontos. Bitte.
Laden Sie die CSV-Datei in Azure Databricks. Der Qiita-Eintrag des Teammitglieds war hilfreich. Referenz: "Mounten von Data Lake Storage Gen2 aus Azure Databricks"
Für die Datenverarbeitung in Databricks war dieser Qiita-Eintrag hilfreich. Referenz: "Memo desjenigen, das häufig beim Umgang mit Daten mit pyspark verwendet wird"
Hängen Sie das Dateisystem ein.
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<Diensthauptanwendungs-ID>",
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>"),
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<Mandanten-ID des AAD-Dateisystemnamens>/oauth2/token",
"fs.azure.createRemoteFileSystemDuringInitialization": "true"}
dbutils.fs.mount(
source = "abfss://auditlog@<Name des Speicherkontos>.dfs.core.windows.net/",
mount_point = "/mnt/auditdata",
extra_configs = configs)
Wenn es bereits gemountet ist und ein Fehler auftritt, heben Sie die Bereitstellung einmal auf.
``` python:Optional
dbutils.fs.unmount("/mnt/auditdata") ```
Lesen Sie die CSV-Datei. Durch Angabe von "inferschema = 'true'" wird hier der Typtyp abgeleitet und die Daten im Datenrahmen gespeichert.
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11.csv")
Überprüfen Sie den Spaltennamen und den Typ, um festzustellen, über welche Informationen Sie verfügen. Spark Dataframe erkennt CreationDate als Zeitstempeltyp.
Spark_df.printSchema()
root
|-- CreationDate: timestamp (nullable = true)
|-- UserIds: string (nullable = true)
|-- Operations: string (nullable = true)
|-- AuditData: string (nullable = true)
Versuchen Sie, die ersten 5 Zeilen anzuzeigen. Wenn Sie für die show-Methode False angeben, wird die Option Truncate entfernt und der gesamte Inhalt der Spaltendaten angezeigt.
Spark_df.show(5, False)
+-------------------+---------------------+------------+------------------------------------------+
|CreationDate |UserIds |Operations |AuditData |
+-------------------+---------------------+------------+------------------------------------------+
|2020-04-10 21:24:57|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T21:24:57"|
|2020-04-10 20:55:58|[email protected]|FileUploaded|"{""CreationTime"":""2020-04-10T20:55:58"|
|2020-04-10 20:32:49|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T20:32:49"|
|2020-04-10 20:33:39|[email protected]|FileAccessed|"{""CreationTime"":""2020-04-10T20:33:39"|
|2020-04-10 19:32:36|[email protected]|UserLoggedIn|"{""CreationTime"":""2020-04-10T19:32:36"|
+-------------------+---------------------+------------+------------------------------------------+
only showing top 5 rows
Nach wie vor schließen wir die AuditData-Spalte aus und lassen die Spalte Daten enthalten, die wir möglicherweise beim Erstellen eines Power BI-Berichts benötigen.
from pyspark.sql.functions import concat, date_format, col, lit
Spark_df = Spark_df.select('CreationDate', 'UserIds', 'Operations', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
Spark_df = Spark_df.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
Spark_df.show()
+-------------------+--------------------+-------------------+----+---------+------------+--------+
| CreationDate| UserIds| Operations|Hour|DayofWeek|Weekday_Name|Day_Weekday|
+-------------------+--------------------+-------------------+----+---------+------------+--------+
|2020-04-10 21:24:57|abc@contoso...| UserLoggedIn| 21| 5| Fri| 5_Fri|
|2020-04-10 20:55:58|abc@contoso...| FileUploaded| 20| 5| Fri| 5_Fri|
|2020-04-10 20:32:49|abc@contoso...| UserLoggedIn| 20| 5| Fri| 5_Fri|
|2020-04-10 20:33:39|abc@contoso...| FileAccessed| 20| 5| Fri| 5_Fri|
|2020-04-10 19:32:36|abc@contoso...| UserLoggedIn| 19| 5| Fri| 5_Fri|
Es gibt eine Lösung namens Microsoft Defender Advanced Threat Protection (DATP), mit der verschiedene Bedrohungen, die die Unternehmensumgebung bedrohen, vermieden, erkannt, untersucht und behandelt werden können. Mit der Funktion Advanced Hunting kann jedoch das Microsoft Defender Security Center verwendet werden. Sie können die gespeicherten Daten bis zu 30 Tage unter verschiedenen Bedingungen durchsuchen und zur Analyse verwenden.
Dieses Mal sammeln wir mithilfe der REST-API Security Center-Informationen von Databricks und verarbeiten sie auf die gleiche Weise wie in Schritt 1.
Um die Advanced Hunting API von Python aus aufzurufen, rufen Sie zunächst ein Zugriffstoken ab.
import json
import urllib.request
import urllib.parse
tenantId = '00000000-0000-0000-0000-000000000000' # Paste your own tenant ID here
appId = '11111111-1111-1111-1111-111111111111' # Paste your own app ID here
appSecret = '22222222-2222-2222-2222-222222222222' # Paste your own app secret here
url = "https://login.windows.net/%s/oauth2/token" % (tenantId)
resourceAppIdUri = 'https://api.securitycenter.windows.com'
body = {
'resource' : resourceAppIdUri,
'client_id' : appId,
'client_secret' : appSecret,
'grant_type' : 'client_credentials'
}
data = urllib.parse.urlencode(body).encode("utf-8")
req = urllib.request.Request(url, data)
response = urllib.request.urlopen(req)
jsonResponse = json.loads(response.read())
aadToken = jsonResponse["access_token"]
Führen Sie eine Kusto-Abfrage aus, um die Informationen abzurufen. Dieses Mal sammeln wir Protokolle, wenn ein bestimmter Prozess ein Ereignis mit einer Netzwerkverbindung auslöst. Sie können Benutzerprozesse verfolgen und Aktivitäten analysieren.
query = 'DeviceNetworkEvents' # Paste your own query here
url = "https://api.securitycenter.windows.com/api/advancedqueries/run"
headers = {
'Content-Type' : 'application/json',
'Accept' : 'application/json',
'Authorization' : "Bearer " + aadToken
}
data = json.dumps({ 'Query' : query }).encode("utf-8")
req = urllib.request.Request(url, data, headers)
response = urllib.request.urlopen(req)
jsonResponse = json.loads(response.read())
schema = jsonResponse["Schema"]
results = jsonResponse["Results"]
Speichern Sie die von der Advanced Hunting API erhaltenen Informationen in Spark Dataframe.
rddData = sc.parallelize(results)
Spark_df2 = spark.read.json(rddData)
Überprüfen Sie den Spaltennamen und den Typ, um festzustellen, über welche Informationen Sie verfügen. Die Datums- und Uhrzeitinformationen werden im Zeitstempel gespeichert, diesmal jedoch nicht vom Zeitstempeltyp erkannt.
Spark_df2.printSchema()
root
|-- ActionType: string (nullable = true)
|-- AppGuardContainerId: string (nullable = true)
|-- DeviceId: string (nullable = true)
|-- DeviceName: string (nullable = true)
|-- InitiatingProcessAccountDomain: string (nullable = true)
|-- InitiatingProcessAccountName: string (nullable = true)
|-- InitiatingProcessAccountObjectId: string (nullable = true)
|-- InitiatingProcessAccountSid: string (nullable = true)
|-- InitiatingProcessAccountUpn: string (nullable = true)
|-- InitiatingProcessCommandLine: string (nullable = true)
|-- InitiatingProcessCreationTime: string (nullable = true)
|-- InitiatingProcessFileName: string (nullable = true)
|-- InitiatingProcessFolderPath: string (nullable = true)
|-- InitiatingProcessId: long (nullable = true)
|-- InitiatingProcessIntegrityLevel: string (nullable = true)
|-- InitiatingProcessMD5: string (nullable = true)
|-- InitiatingProcessParentCreationTime: string (nullable = true)
|-- InitiatingProcessParentFileName: string (nullable = true)
|-- InitiatingProcessParentId: long (nullable = true)
|-- InitiatingProcessSHA1: string (nullable = true)
|-- InitiatingProcessSHA256: string (nullable = true)
|-- InitiatingProcessTokenElevation: string (nullable = true)
|-- LocalIP: string (nullable = true)
|-- LocalIPType: string (nullable = true)
|-- LocalPort: long (nullable = true)
|-- Protocol: string (nullable = true)
|-- RemoteIP: string (nullable = true)
|-- RemoteIPType: string (nullable = true)
|-- RemotePort: long (nullable = true)
|-- RemoteUrl: string (nullable = true)
|-- ReportId: long (nullable = true)
|-- Timestamp: string (nullable = true)
|-- _corrupt_record: string (nullable = true)
Verwenden Sie "InitiatingProcessFileName", um die Statistiken für jeden Prozess zu überprüfen.
Spark_df2.groupBy("InitiatingProcessFileName").count().sort("count", ascending=False).show()
+-------------------------+-----+
|InitiatingProcessFileName|count|
+-------------------------+-----+
| svchost.exe|10285|
| MsSense.exe| 2179|
| chrome.exe| 1693|
| OfficeClickToRun.exe| 1118|
| OneDrive.exe| 914|
| AvastSvc.exe| 764|
| backgroundTaskHos...| 525|
| MicrosoftEdgeCP.exe| 351|
Konvertieren Sie den Datentyp der Spalte "Zeitstempel" in den Zeitstempeltyp und speichern Sie ihn zusammen mit Schritt 1 unter dem Spaltennamen "CreationDate".
from pyspark.sql.types import TimestampType
Spark_df2 = Spark_df2.withColumn("CreationDate", Spark_df2["Timestamp"].cast(TimestampType()))
Spark_df2.printSchema()
Nach wie vor schließen wir unnötige Spalten aus und verfügen über Spalten mit Daten, die möglicherweise beim Erstellen eines Power BI-Berichts benötigt werden.
from pyspark.sql.functions import concat, date_format, col, lit
Spark_df2 = Spark_df2.select('CreationDate', 'DeviceId', 'DeviceName', 'InitiatingProcessFileName', 'InitiatingProcessAccountName', 'RemoteUrl', 'RemoteIP', 'LocalIP', date_format('CreationDate', 'HH').alias('Hour'),date_format('CreationDate', 'u').alias('DayofWeek'), date_format('CreationDate', 'EE').alias('Weekday_Name'))
Spark_df2 = Spark_df2.withColumn("Day_Weekday",concat(col("DayofWeek"),lit('_'),col("Weekday_Name")))
Spark_df2.show()
Überprüfen Sie den Spaltennamen und den Typ. Es war erfrischend.
Spark_df2.printSchema()
root
|-- CreationDate: timestamp (nullable = true)
|-- DeviceId: string (nullable = true)
|-- DeviceName: string (nullable = true)
|-- InitiatingProcessFileName: string (nullable = true)
|-- InitiatingProcessAccountName: string (nullable = true)
|-- RemoteUrl: string (nullable = true)
|-- RemoteIP: string (nullable = true)
|-- LocalIP: string (nullable = true)
|-- Hour: string (nullable = true)
|-- DayofWeek: string (nullable = true)
|-- Weekday_Name: string (nullable = true)
|-- Day_Weekday: string (nullable = true)
Nachdem es nun gut aussieht, schreiben wir die in Schritt 1 und Schritt 2 verarbeiteten Daten in verschiedenen Formaten.
Da die Daten von Schritt 1 in Spark_df und die Daten von Schritt 2 in Spark_df2 erstellt werden, schreiben wir sie in die CSV-Datei. Sie können Ausgabedateien mit Coalesce (1) kombinieren. Wenn Sie Header-Informationen benötigen, setzen Sie diese optional auf "true".
Spark_df.coalesce(1).write.option("header", "true").csv("/mnt/auditdata/AuditLog_2020-01-11_2020-04-11_edited.csv")
Stellen Sie sicher, dass die CSV-Datei im auf Databricks bereitgestellten Azure Data Lake Storage Gen2-Speicherkonto erstellt wurde. Wenn ich es herunterlade, scheint die CSV-Datei direkt unter dem Ordner mit dem angegebenen Dateinamen gespeichert zu sein.
(Referenz) CSV-Lesung ist wie folgt
``` python:Input
#Spark Dataframe
Spark_df = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/auditdata/Spark_df.csv")
display (Spark_df)
#pandas
import pandas as pd
pd_dataframe = pd.read_csv('/dbfs/mnt/auditdata/Spark_df.csv')
```
Versuchen Sie auch, im Parkettformat zu schreiben.
Spark_df.write.mode("append").parquet("/mnt/auditdata/parquet/audit")
(Referenz) Das Lesen von Parkett ist wie folgt
``` python:Input
#Python
data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)
#Scala
%scala
val data = sqlContext.read.parquet("/mnt/auditdata/parquet/audit")
display(data)
#SQL
%sql
CREATE TEMPORARY TABLE scalaTable
USING parquet
OPTIONS (
path "/mnt/auditdata/parquet/audit"
)
SELECT * FROM scalaTable
```
Versuchen Sie auch, im Databricks Table-Format zu schreiben.
Spark_df.write.saveAsTable("worktime")
worktime = spark.sql("select * from worktime")
display(worktime.select("*"))
Zuletzt erstellen wir einen Bericht mit den bisherigen Daten, damit sie in Power BI angezeigt werden können.
Starten Sie Databricks Workspace über das Azure-Portal, zeigen Sie im linken Bereich "Cluster" an und wählen Sie den Cluster aus, in dem die Tabelle ausgeführt wird, zu der eine Verbindung hergestellt werden soll.
Wählen Sie im Cluster-Einstellungsfeld Erweiterte Optionen aus, um das JDBC / ODBC-Menü anzuzeigen.
Der Einstellungsbildschirm enthält die folgenden Informationen.
Verwenden Sie diese Informationen, um die Zeichenfolge für die Verbindungszieleinstellung abzurufen.
https://<Hostname>:<Port>/<HTTP Path>
Insbesondere sollte es sich um eine Zeichenfolge wie die folgende handeln.
Server : https://xxx-xxx.1.azuredatabricks.net:443/sql/protocolv1/o/687887143797341/xxxx-xxxxxx-xxxxxxxx
Klicken Sie im Bildschirm zur Verwaltung des Databrick-Arbeitsbereichs oben rechts auf das Benutzerprofilsymbol und dann auf Benutzereinstellungen.
Klicken Sie auf die Registerkarte "Zugriffstoken" und dann auf die Schaltfläche "Neues Token generieren".
Schreiben Sie im Bildschirm "Neues Token generieren" "Power BI" in das Feld "Kommentar". Es ist eine Option, damit Sie es nicht schreiben müssen.
Klicken Sie auf die Schaltfläche "Generieren" und kopieren und speichern Sie das erstellte Token.
Starten Sie Power BI Desktop und wählen Sie "Spark" als Zieldatenquelle unter "Get Data".
Fügen Sie in den Spark-Verbindungseinstellungen die zuvor erhaltene Zeichenfolge für die Verbindungszieleinstellung in das Feld "Server" ein. Wählen Sie "HTTP" als Protokoll und "Direkte Abfrage" als Verbindungsmodus und klicken Sie auf die Schaltfläche "OK".
Geben Sie in den Spark-Verbindungseinstellungen "Token" in das Feld "Benutzername" ein und fügen Sie das zuvor erhaltene Passwort ein. Klicken Sie auf die Schaltfläche "Verbinden".
Die Liste der in Schritt 3 erstellten Tabellen wird angezeigt. Wählen Sie die für den Power BI-Bericht erforderliche Tabelle aus und klicken Sie auf die Schaltfläche "Laden".
Mit den in den Schritten 1 bis 3 vorbereiteten Daten habe ich schließlich einen solchen Bericht mit Power BI Desktop erstellt.
Dieses Mal habe ich versucht, mit der Protokollanalyse und -visualisierung mit Databricks fortzufahren. Es fühlt sich so an, als würden wir nur einen Teil des Potenzials von Databricks nutzen. In der Realität muss es in der Lage sein, sein wahres Potenzial in Situationen zu demonstrieren, in denen eine verteilte Verarbeitung für Data Lake erforderlich ist, das eine große Datenmenge angesammelt hat.
Dennoch ist es eine vielseitige Verarbeitungsplattform, die in jeder Sprache wie Scala, Python, R, SQL verwendet werden kann. Es ist wunderbar, dass sie Stream-Verarbeitung, maschinelles Lernen und Visualisierung durchführen und auch mit verschiedenen Azure-Diensten einschließlich Power BI verknüpft werden kann. Ich fühlte, dass.
Wir empfehlen Azure Databricks mit Vertrauen allen, die Daten haben, sich aber fragen, wie sie verwendet werden sollen oder Probleme mit der Datenverarbeitung haben.
Ich war auch an einer Verknüpfung mit der Azure SQL-Datenbank und Cosmos DB interessiert, daher werde ich es beim nächsten Mal versuchen.
Recommended Posts