Beim Erstellen eines AI-Modells mit Zeitreihendaten wird häufig der Verarbeitungsablauf für die Berechnung der Merkmalsmenge mithilfe eines Schiebefensters und das anschließende Training mit verschiedenen Algorithmen implementiert. In Bezug auf die Merkmalsmengenextraktion dieser Zeitreihendaten ist die Schwierigkeit der Verarbeitung schwierig, da praktische Pakete wie "Pandas" und "Numpy" nicht für Daten mit großer Kapazität (sogenannte Big Data) verwendet werden können, die nicht in den Speicher passen. geh hinauf. Wie erreichen Sie das? In diesem Artikel werde ich vorstellen, wie man "PySpark" als Beispiel verwendet. Da die Anzahl der Sätze groß sein wird, werde ich sie in zwei Teilen einführen.
Da es sich um eine Basisversion handelt, werde ich dieses Mal eine allgemeine "Methode zur Berechnung von Features mithilfe eines Schiebefensters mit PySpark" vorstellen.
Ich habe Azure Synapse Analytics als Ausführungsumgebung für "PySpark" verwendet. Die Hauptpaketversionen sind wie folgt. (Standardeinstellung ab 20. August 2020)
Apache Spark 2.4
Python version 3.6.1
Ich werde die Methode zum Erstellen der Verifizierungsumgebung weglassen. Ich möchte einen Artikel zum Erstellen einer Spark-Ausführungsumgebung mit Azure Synapse Analytics schreiben, wenn Sie eine Anfrage haben. Wir würden uns freuen, wenn Sie in den Kommentaren anfordern könnten.
Definieren Sie die entsprechenden Daten als PySpark-Datenrahmen.
df = sqlContext.createDataFrame([
(1, 2.65,2.42,6.90,4.93),
(2, 2.57,8.50,2.40,5.37),
(3, 2.13,3.76,7.52,7.67),
(4, 3.09,7.28,3.59,6.34),
(5, 5.75,4.69,5.26,3.11),
(6, 6.91,4.04,2.03,6.28),
(7, 5.44,3.22,2.87,7.14),
(8, 4.86,7.47,3.68,0.32),
(9, 9.70,7.43,4.43,7.74),
(10,6.30,7.72,7.78,7.91),
],
["time", "data1", "data2", "data3", "data4"])
df.show()
# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# | 1| 2.65| 2.42| 6.9| 4.93|
# | 2| 2.57| 8.5| 2.4| 5.37|
# | 3| 2.13| 3.76| 7.52| 7.67|
# | 4| 3.09| 7.28| 3.59| 6.34|
# | 5| 5.75| 4.69| 5.26| 3.11|
# | 6| 6.91| 4.04| 2.03| 6.28|
# | 7| 5.44| 3.22| 2.87| 7.14|
# | 8| 4.86| 7.47| 3.68| 0.32|
# | 9| 9.7| 7.43| 4.43| 7.74|
# | 10| 6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+
Stellen Sie sich vor, dass die folgenden Daten erhalten wurden.
Spaltenname | Bedeutung |
---|---|
time | Aufnahmezeit (Sekunden) |
data1~data6 | Messdaten |
Das Schiebefenster von "PySpark" wird durch "Windows" von "pyspark.sql.window" definiert. Hier wird ein Schiebefenster mit einer Fensterbreite von 5 Sekunden definiert.
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# sliding-Fenstereinstellungen
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)
Der Sortierschlüssel wird durch "orderBy (" Spaltenname ")" angegeben. Die Angabe des Sortierschlüssels ist sehr wichtig, da "Spark" die Verarbeitungsreihenfolge nicht garantiert. In diesem Beispiel wird die Reihenfolge der "Zeit", die die Aufnahmezeit angibt, dh "Zeit", in aufsteigender Reihenfolge angeordnet und dann in der Reihenfolge des ersten Datensatzes verarbeitet. Geben Sie daher "orderBy (F.col (" time "))" an. tun. Übrigens wird es standardmäßig von ASC (aufsteigende Reihenfolge) verarbeitet. Wenn Sie mit DESC (absteigende Reihenfolge) verarbeiten möchten, schreiben Sie wie folgt.
sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)
Wenn Sie ".desc ()" zu "F.col (" time ")" hinzufügen, wird dies in absteigender Reihenfolge behandelt.
Als nächstes wird die Fensterbreite durch "rowsBetween (Window.currentRow, window_size-1)" definiert. Das erste Argument ist die Definition der Startposition, wobei Window.currentRow
und die aktuelle Zeile angegeben werden. Das zweite Argument ist die Definition der Endposition, wobei window_size-1
und 4 Zeilen voraus (4 Sekunden voraus) von der aktuellen Zeile angegeben werden. Damit können die Daten für 5 Zeilen (5 Sekunden) bis zu 4 Zeilen voraus, einschließlich der aktuellen Zeile, als ein Fenster definiert werden.
Führen Sie die Merkmalmengenextraktion mithilfe der zuvor festgelegten Definition des Schiebefensters durch. Versuchen Sie, die Werte "max (Maximalwert)", "min (Minimalwert)" und "avg (Durchschnittswert)" in der Fensterbreite für "data1" abzurufen.
df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
.withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
.withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
.select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
.show()
# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1| feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# | 1| 2.65| 5.75| 2.13|3.2379999999999995|
# | 2| 2.57| 6.91| 2.13| 4.09|
# | 3| 2.13| 6.91| 2.13| 4.664|
# | 4| 3.09| 6.91| 3.09| 5.21|
# | 5| 5.75| 9.7| 4.86| 6.531999999999999|
# | 6| 6.91| 9.7| 4.86| 6.642|
# | 7| 5.44| 9.7| 4.86| 6.575|
# | 8| 4.86| 9.7| 4.86| 6.953333333333333|
# | 9| 9.7| 9.7| 6.3| 8.0|
# | 10| 6.3| 6.3| 6.3| 6.3|
# +----+-----+--------------+--------------+------------------+
Das Ergebnis des durch den angegebenen Spaltennamen angegebenen Verarbeitungsinhalts wird als neue Spalte des Datenrahmens mit "withColumn (" Spaltenname ", Verarbeitungsinhalt)" hinzugefügt. Wenn man sich den Verarbeitungscode ansieht, der "max" berechnet, ist es "withColumn" ("feat_max_data1", F.max ("data1"). Over (sliding_window)) "und es wird das" max "von" data1 "verwendet. Unter der Bedingung "over (sliding_window)". Das Ergebnis wird als "feat_max_data1" -Spalte hinzugefügt. " Die Schiebefensterspezifikation in "PySpark" wird durch "over ()" definiert. Da "PySpark" die Verarbeitung einzeln definiert, müssen mehrere Verarbeitungscodes aufgelistet werden, wenn mehrere Merkmalsmengen aus einer Spalte erfasst werden, wie in diesem Beispiel.
Bisher haben wir die Grundlagen von "Berechnen von Features mithilfe eines Schiebefensters mit PySpark" eingeführt. Die diesmal eingeführte Verarbeitungsmethode ist eine allgemeine Methode, und ich denke, dass sie ausreicht, wenn die Datenmenge gering oder die Menge der zu extrahierenden Merkmale gering ist. Wenn jedoch die Datenmenge groß ist, die Anzahl der zu verarbeitenden Spalten groß ist oder die Menge der zu extrahierenden Merkmale groß ist, führt dieses Verarbeitungsverfahren zu einer schlechten Verarbeitungseffizienz und hohen Verarbeitungskosten. Es ist jedoch möglich, die Verarbeitungskosten durch Entwickeln des Verarbeitungsverfahrens dramatisch zu verbessern. Als erweiterte Version werde ich das nächste Mal vorstellen, welche Art von Gerät für eine effizientere Verarbeitung verwendet werden kann.
Danke fürs Lesen.
Recommended Posts