[PYTHON] So extrahieren Sie Funktionen von Zeitreihendaten mit PySpark Basics

Einführung

Beim Erstellen eines AI-Modells mit Zeitreihendaten wird häufig der Verarbeitungsablauf für die Berechnung der Merkmalsmenge mithilfe eines Schiebefensters und das anschließende Training mit verschiedenen Algorithmen implementiert. In Bezug auf die Merkmalsmengenextraktion dieser Zeitreihendaten ist die Schwierigkeit der Verarbeitung schwierig, da praktische Pakete wie "Pandas" und "Numpy" nicht für Daten mit großer Kapazität (sogenannte Big Data) verwendet werden können, die nicht in den Speicher passen. geh hinauf. Wie erreichen Sie das? In diesem Artikel werde ich vorstellen, wie man "PySpark" als Beispiel verwendet. Da die Anzahl der Sätze groß sein wird, werde ich sie in zwei Teilen einführen.

Da es sich um eine Basisversion handelt, werde ich dieses Mal eine allgemeine "Methode zur Berechnung von Features mithilfe eines Schiebefensters mit PySpark" vorstellen.

Überprüfungsumgebung

Ich habe Azure Synapse Analytics als Ausführungsumgebung für "PySpark" verwendet. Die Hauptpaketversionen sind wie folgt. (Standardeinstellung ab 20. August 2020)

Apache Spark 2.4
Python version 3.6.1

So berechnen Sie Features mithilfe eines Schiebefensters in PySpark

Ich werde die Methode zum Erstellen der Verifizierungsumgebung weglassen. Ich möchte einen Artikel zum Erstellen einer Spark-Ausführungsumgebung mit Azure Synapse Analytics schreiben, wenn Sie eine Anfrage haben. Wir würden uns freuen, wenn Sie in den Kommentaren anfordern könnten.

1. Datenaufbereitung

Definieren Sie die entsprechenden Daten als PySpark-Datenrahmen.

df = sqlContext.createDataFrame([
    (1, 2.65,2.42,6.90,4.93),
    (2, 2.57,8.50,2.40,5.37),
    (3, 2.13,3.76,7.52,7.67),
    (4, 3.09,7.28,3.59,6.34),
    (5, 5.75,4.69,5.26,3.11),
    (6, 6.91,4.04,2.03,6.28),
    (7, 5.44,3.22,2.87,7.14),
    (8, 4.86,7.47,3.68,0.32),
    (9, 9.70,7.43,4.43,7.74),
    (10,6.30,7.72,7.78,7.91),
    ], 
    ["time", "data1", "data2", "data3", "data4"])

df.show()

# +----+-----+-----+-----+-----+
# |time|data1|data2|data3|data4|
# +----+-----+-----+-----+-----+
# |   1| 2.65| 2.42|  6.9| 4.93|
# |   2| 2.57|  8.5|  2.4| 5.37|
# |   3| 2.13| 3.76| 7.52| 7.67|
# |   4| 3.09| 7.28| 3.59| 6.34|
# |   5| 5.75| 4.69| 5.26| 3.11|
# |   6| 6.91| 4.04| 2.03| 6.28|
# |   7| 5.44| 3.22| 2.87| 7.14|
# |   8| 4.86| 7.47| 3.68| 0.32|
# |   9|  9.7| 7.43| 4.43| 7.74|
# |  10|  6.3| 7.72| 7.78| 7.91|
# +----+-----+-----+-----+-----+

Stellen Sie sich vor, dass die folgenden Daten erhalten wurden.

Spaltenname Bedeutung
time Aufnahmezeit (Sekunden)
data1~data6 Messdaten

2. Definition des Schiebefensters

Das Schiebefenster von "PySpark" wird durch "Windows" von "pyspark.sql.window" definiert. Hier wird ein Schiebefenster mit einer Fensterbreite von 5 Sekunden definiert.

from pyspark.sql.window import Window
import pyspark.sql.functions as F

# sliding-Fenstereinstellungen
window_size = 5
sliding_window = Window.orderBy(F.col("time")).rowsBetween(Window.currentRow, window_size-1)

Der Sortierschlüssel wird durch "orderBy (" Spaltenname ")" angegeben. Die Angabe des Sortierschlüssels ist sehr wichtig, da "Spark" die Verarbeitungsreihenfolge nicht garantiert. In diesem Beispiel wird die Reihenfolge der "Zeit", die die Aufnahmezeit angibt, dh "Zeit", in aufsteigender Reihenfolge angeordnet und dann in der Reihenfolge des ersten Datensatzes verarbeitet. Geben Sie daher "orderBy (F.col (" time "))" an. tun. Übrigens wird es standardmäßig von ASC (aufsteigende Reihenfolge) verarbeitet. Wenn Sie mit DESC (absteigende Reihenfolge) verarbeiten möchten, schreiben Sie wie folgt.

sliding_window = Window.orderBy(F.col("time").desc()).rowsBetween(Window.currentRow, window_size-1)

Wenn Sie ".desc ()" zu "F.col (" time ")" hinzufügen, wird dies in absteigender Reihenfolge behandelt.

Als nächstes wird die Fensterbreite durch "rowsBetween (Window.currentRow, window_size-1)" definiert. Das erste Argument ist die Definition der Startposition, wobei Window.currentRow und die aktuelle Zeile angegeben werden. Das zweite Argument ist die Definition der Endposition, wobei window_size-1 und 4 Zeilen voraus (4 Sekunden voraus) von der aktuellen Zeile angegeben werden. Damit können die Daten für 5 Zeilen (5 Sekunden) bis zu 4 Zeilen voraus, einschließlich der aktuellen Zeile, als ein Fenster definiert werden.

3. Merkmalsberechnung

Führen Sie die Merkmalmengenextraktion mithilfe der zuvor festgelegten Definition des Schiebefensters durch. Versuchen Sie, die Werte "max (Maximalwert)", "min (Minimalwert)" und "avg (Durchschnittswert)" in der Fensterbreite für "data1" abzurufen.

df.withColumn('feat_max_data1', F.max('data1').over(sliding_window))\
  .withColumn('feat_min_data1', F.min('data1').over(sliding_window))\
  .withColumn('feat_avg_data1', F.avg('data1').over(sliding_window))\
  .select('time', 'data1', 'feat_max_data1', 'feat_min_data1', 'feat_avg_data1')\
  .show()

# +----+-----+--------------+--------------+------------------+
# |time|data1|feat_max_data1|feat_min_data1|    feat_avg_data1|
# +----+-----+--------------+--------------+------------------+
# |   1| 2.65|          5.75|          2.13|3.2379999999999995|
# |   2| 2.57|          6.91|          2.13|              4.09|
# |   3| 2.13|          6.91|          2.13|             4.664|
# |   4| 3.09|          6.91|          3.09|              5.21|
# |   5| 5.75|           9.7|          4.86| 6.531999999999999|
# |   6| 6.91|           9.7|          4.86|             6.642|
# |   7| 5.44|           9.7|          4.86|             6.575|
# |   8| 4.86|           9.7|          4.86| 6.953333333333333|
# |   9|  9.7|           9.7|           6.3|               8.0|
# |  10|  6.3|           6.3|           6.3|               6.3|
# +----+-----+--------------+--------------+------------------+

Das Ergebnis des durch den angegebenen Spaltennamen angegebenen Verarbeitungsinhalts wird als neue Spalte des Datenrahmens mit "withColumn (" Spaltenname ", Verarbeitungsinhalt)" hinzugefügt. Wenn man sich den Verarbeitungscode ansieht, der "max" berechnet, ist es "withColumn" ("feat_max_data1", F.max ("data1"). Over (sliding_window)) "und es wird das" max "von" data1 "verwendet. Unter der Bedingung "over (sliding_window)". Das Ergebnis wird als "feat_max_data1" -Spalte hinzugefügt. " Die Schiebefensterspezifikation in "PySpark" wird durch "over ()" definiert. Da "PySpark" die Verarbeitung einzeln definiert, müssen mehrere Verarbeitungscodes aufgelistet werden, wenn mehrere Merkmalsmengen aus einer Spalte erfasst werden, wie in diesem Beispiel.

Zusammenfassung

Bisher haben wir die Grundlagen von "Berechnen von Features mithilfe eines Schiebefensters mit PySpark" eingeführt. Die diesmal eingeführte Verarbeitungsmethode ist eine allgemeine Methode, und ich denke, dass sie ausreicht, wenn die Datenmenge gering oder die Menge der zu extrahierenden Merkmale gering ist. Wenn jedoch die Datenmenge groß ist, die Anzahl der zu verarbeitenden Spalten groß ist oder die Menge der zu extrahierenden Merkmale groß ist, führt dieses Verarbeitungsverfahren zu einer schlechten Verarbeitungseffizienz und hohen Verarbeitungskosten. Es ist jedoch möglich, die Verarbeitungskosten durch Entwickeln des Verarbeitungsverfahrens dramatisch zu verbessern. Als erweiterte Version werde ich das nächste Mal vorstellen, welche Art von Gerät für eine effizientere Verarbeitung verwendet werden kann.

Danke fürs Lesen.

Recommended Posts

So extrahieren Sie Funktionen von Zeitreihendaten mit PySpark Basics
Versuchen Sie, Merkmale von Sensordaten mit CNN zu extrahieren
Umgang mit Zeitreihendaten (Implementierung)
Zeigen Sie Details zu Zeitreihendaten mit Remotte an
Lesen von Zeitreihendaten in PyTorch
Ich habe versucht, Funktionen mit SIFT von OpenCV zu extrahieren
So extrahieren Sie mit Pandas Daten, denen der Wert nan nicht fehlt
So extrahieren Sie mit Pandas Daten, denen der Wert nan nicht fehlt
Ich wollte nur die Daten des gewünschten Datums und der gewünschten Uhrzeit mit Django extrahieren
Umgang mit unausgeglichenen Daten
Aufblasen von Daten (Datenerweiterung) mit PyTorch
Differenzierung von Zeitreihendaten (diskret)
Zeitreihenanalyse 3 Vorverarbeitung von Zeitreihendaten
So berechnen Sie die Summe oder den Durchschnitt von Zeitreihen-CSV-Daten in einem Augenblick
<Pandas> Umgang mit Zeitreihendaten in der Pivot-Tabelle
Vorhersage von Zeitreihendaten durch Simplex-Projektion
Vorhersage von Zeitreihendaten mit einem neuronalen Netzwerk
So vergleichen Sie Zeitreihendaten-Derivative DTW, DTW-
Jupyter Notebook Grundlagen der Verwendung
Grundlagen von PyTorch (1) - Verwendung von Tensor-
Wie man Problemdaten mit Paiza liest
[Einführung in Data Scientist] Grundlagen von Python ♬
Zeichnen Sie die CSV von Zeitreihendaten mit einem Unixtime-Wert in Python (matplotlib).
Erstellen von CSV-Beispieldaten mit Hypothese
Konvertieren Sie Daten mit Form (Anzahl der Daten, 1) in (Anzahl der Daten,) mit numpy.
So implementieren Sie die Time-Wait-Verarbeitung mit wxpython
Erfassung von Zeitreihendaten (täglich) von Aktienkursen
So kratzen Sie Pferderenndaten mit Beautiful Soup
So legen Sie Attribute mit Mock of Python fest
So implementieren Sie "named_scope" von RubyOnRails mit Django
Glättung von Zeitreihen und Wellenformdaten 3 Methoden (Glättung)
So messen Sie die Ausführungszeit mit Python Teil 1
So messen Sie die Ausführungszeit mit Python Part 2
Zusammenfassung zum Lesen numerischer Daten mit Python [CSV, NetCDF, Fortran Binary]
[Einführung in Python] So erhalten Sie den Datenindex mit der for-Anweisung
Verwendung von xgboost: Mehrklassenklassifizierung mit Irisdaten
Ich habe versucht, "Grundlagen der Zeitreihenanalyse und des Zustandsraummodells" (Hayamoto) mit Pystan zu implementieren
Merkmalsmenge, die aus Zeitreihendaten extrahiert werden kann
So kratzen Sie Bilddaten von Flickr mit Python
Abnormalitätserkennung von Zeitreihendaten durch LSTM (Keras)
So konvertieren Sie horizontal gehaltene Daten mit Pandas in vertikal gehaltene Daten
So messen Sie die Wiedergabezeit von MP3-Dateien mit Python
So erhalten Sie mit SQLAlchemy + MySQLdb mehr als 1000 Daten
Verwendung von Python Kivy ~ ~ Grundlagen der Kv-Sprache ~
Zeitreihenanalyse 1 Grundlagen
So extrahieren Sie Nullwerte und Nicht-Nullwerte mit Pandas
So geben Sie die CSV eines mehrzeiligen Headers mit Pandas aus
Ableiten der MAP-Schätzung von HMM mit PyStruct
Grundlagen von PyTorch (2) - Wie erstelle ich ein neuronales Netzwerk?
Implementierung der Clustering-K-Form-Methode für Zeitreihendaten [Unüberwachtes Lernen mit Python Kapitel 13]
Ableiten der MAP-Schätzung von HMM mit OpenGM
Wie man strukturiertes SVM von ChainCRF mit PyStruct lernt
[Grundlagen der Datenwissenschaft] Sammeln von Daten aus RSS mit Python
Zusammenfassung, wie der Status mit mehreren Funktionen geteilt wird
Beispiel für das Aggregieren einer großen Menge von Zeitreihendaten mit Python in einer kleinen Speicherumgebung mit einer angemessenen Geschwindigkeit
[Einführung in das SIR-Modell] Prognostizieren Sie die Endzeit jedes Landes mit der COVID-19-Datenanpassung ♬