So ermitteln Sie die kumulative Summe für jede Gruppe mithilfe von DataFrame in Spark [Python-Version]

Dies ist eine Methode zum Berechnen der kumulierten Summe beim Gruppieren und Sortieren von Spalten mithilfe der Fensterfunktion von Spark's Python-Version DataFrame.

Es ist eine Methode, nach der ich gesucht habe, als ich mir die offizielle Python-API-Dokumentation angesehen habe. Es gibt also möglicherweise einen besseren Weg. Die verwendete Version von Spark ist 1.5.2.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html

Beispieldaten

Bereiten Sie die Testdaten in der PostgreSQL-Tabelle vor und laden Sie sie als DataFrame in pyspark.

$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar PYSPARK_DRIVER_PYTHON=ipython pyspark
(..snip..)
In [1]: df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres?user=postgres', dbtable='public.foo').load()
(..snip..)
In [2]: df.printSchema()
root
 |-- a: integer (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: integer (nullable = true)
In [4]: df.show()
+---+--------------------+---+
|  a|                   b|  c|
+---+--------------------+---+
|  1|2015-11-22 10:00:...|  1|
|  1|2015-11-22 10:10:...|  2|
|  1|2015-11-22 10:20:...|  3|
|  1|2015-11-22 10:30:...|  4|
|  1|2015-11-22 10:40:...|  5|
|  1|2015-11-22 10:50:...|  6|
|  1|2015-11-22 11:00:...|  7|
|  1|2015-11-22 11:10:...|  8|
|  1|2015-11-22 11:20:...|  9|
|  1|2015-11-22 11:30:...| 10|
|  1|2015-11-22 11:40:...| 11|
|  1|2015-11-22 11:50:...| 12|
|  1|2015-11-22 12:00:...| 13|
|  2|2015-11-22 10:00:...|  1|
|  2|2015-11-22 10:10:...|  2|
|  2|2015-11-22 10:20:...|  3|
|  2|2015-11-22 10:30:...|  4|
|  2|2015-11-22 10:40:...|  5|
|  2|2015-11-22 10:50:...|  6|
|  2|2015-11-22 11:00:...|  7|
+---+--------------------+---+
only showing top 20 rows

Spalte a dient zur Gruppierung, Spalte b zur Sortierung und Spalte c zur Berechnung.

Kumulative Summe für jede Spaltengruppe

Sortieren Sie beim Gruppieren nach Spalte a nach Spalte b und nehmen Sie die kumulative Summe von Spalte c.

Erstens die Definition von Fenster

In [6]: from pyspark.sql.Window import Window

In [7]: from pyspark.sql import functions as func

In [8]: window = Window.partitionpartitionBy(df.a).orderBy(df.b).rangeBetween(-sys.maxsize,0)

In [9]: window
Out[9]: <pyspark.sql.window.WindowSpec at 0x18368d0>

Erstellen Sie in diesem Fenster eine Spalte, die pyspark.sql.functions.sum () berechnet

In [10]: cum_c = func.sum(df.c).over(window)

In [11]: cum_c
Out[11]: Column<'sum(c) WindowSpecDefinition UnspecifiedFrame>

Erstellen Sie einen neuen DataFrame, indem Sie diese Spalte an den ursprünglichen DataFrame anhängen

In [12]: mod_df = df.withColumn("cum_c", cum_c)

In [13]: mod_df
Out[13]: DataFrame[a: int, b: timestamp, c: int, cum_c: bigint]

In [14]: mod_df.printSchema()
root
 |-- a: integer (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: integer (nullable = true)
 |-- cum_c: long (nullable = true)


In [15]: mod_df.show()
+---+--------------------+---+-----+
|  a|                   b|  c|cum_c|
+---+--------------------+---+-----+
|  1|2015-11-22 10:00:...|  1|    1|
|  1|2015-11-22 10:10:...|  2|    3|
|  1|2015-11-22 10:20:...|  3|    6|
|  1|2015-11-22 10:30:...|  4|   10|
|  1|2015-11-22 10:40:...|  5|   15|
|  1|2015-11-22 10:50:...|  6|   21|
|  1|2015-11-22 11:00:...|  7|   28|
|  1|2015-11-22 11:10:...|  8|   36|
|  1|2015-11-22 11:20:...|  9|   45|
|  1|2015-11-22 11:30:...| 10|   55|
|  1|2015-11-22 11:40:...| 11|   66|
|  1|2015-11-22 11:50:...| 12|   78|
|  1|2015-11-22 12:00:...| 13|   91|
|  2|2015-11-22 10:00:...|  1|    1|
|  2|2015-11-22 10:10:...|  2|    3|
|  2|2015-11-22 10:20:...|  3|    6|
|  2|2015-11-22 10:30:...|  4|   10|
|  2|2015-11-22 10:40:...|  5|   15|
|  2|2015-11-22 10:50:...|  6|   21|
|  2|2015-11-22 11:00:...|  7|   28|
+---+--------------------+---+-----+
only showing top 20 rows

Sie können es berechnen.

Summe jeder Spaltengruppe

Berechnen Sie nun die Summe der Spalte c für jede Gruppe der Spalte a. Setzen Sie den DataFrame mit groupBy () auf pyspark.sql.GroupedData und verwenden Sie pyspark.sql.GroupedData.sum (). Es ist kompliziert mit sum (), aber seien Sie vorsichtig, denn wenn Sie die Option Column als Argument haben, erhalten Sie eine Fehlermeldung.

In [25]: sum_c_df = df.groupBy('a').sum('c')

Im Gegensatz zu zuvor ist dies keine Fensterfunktion, sodass das zurückgegebene Ergebnis ein DataFrame ist. Darüber hinaus wird der Spaltenname, in dem die Summe gespeichert ist, willkürlich festgelegt.

In [26]: sum_c_df
Out[26]: DataFrame[a: int, sum(c): bigint]

Nun, es ist kompliziert.

Fügen Sie es vorerst als Spalte an den ursprünglichen DataFrame an.

In [27]: mod_df3 = mod_df2.join('a'sum_c_df, 'a'()

In [28]: mod_df3.printSchema()
root
 |-- a: integer (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: integer (nullable = true)
 |-- cum_c: long (nullable = true)
 |-- sum(c): long (nullable = true)


In [29]: mod_df3.show()
(..snip..)
+---+--------------------+---+-------+------+
|  a|                   b|  c|  cum_c|sum(c)|
+---+--------------------+---+-------+------+
|  1|2015-11-22 10:00:...|  1|      1|    91|
|  1|2015-11-22 10:10:...|  2|      3|    91|
|  1|2015-11-22 10:20:...|  3|      6|    91|
|  1|2015-11-22 10:30:...|  4|     10|    91|
|  1|2015-11-22 10:40:...|  5|     15|    91|
|  1|2015-11-22 10:50:...|  6|     21|    91|
|  1|2015-11-22 11:00:...|  7|     28|    91|
|  1|2015-11-22 11:10:...|  8|     36|    91|
|  1|2015-11-22 11:20:...|  9|     45|    91|
|  1|2015-11-22 11:30:...| 10|     55|    91|
|  1|2015-11-22 11:40:...| 11|     66|    91|
|  1|2015-11-22 11:50:...| 12|     78|    91|
|  1|2015-11-22 12:00:...| 13|     91|    91|
|  2|2015-11-22 10:00:...|  1|      1|    91|
|  2|2015-11-22 10:10:...|  2|      3|    91|
|  2|2015-11-22 10:20:...|  3|      6|    91|
|  2|2015-11-22 10:30:...|  4|     10|    91|
|  2|2015-11-22 10:40:...|  5|     15|    91|
|  2|2015-11-22 10:50:...|  6|     21|    91|
|  2|2015-11-22 11:00:...|  7|     28|    91|
+---+--------------------+---+-------+------+
only showing top 20 rows

Sie haben die Summe für jede Gruppe erfolgreich berechnet.

(Gesamtsumme) für jede Spaltengruppe

Berechnen wir nun den verbleibenden Wert bis zur Summe für Spalte c. Das heißt, die Gesamtsumme.

In [30]: diff_sum_c = mod_df3[('sum(c)'] - mod_df3['cum_c']

In [31]: mod_df4 = mod_df3.withColumn("diff_sum_c", diff_sum_c)

In [34]: mod_df4.show()
(..snip..)
+---+--------------------+---+-------+------+----------+
|  a|                   b|  c|cum_c_2|sum(c)|diff_sum_c|
+---+--------------------+---+-------+------+----------+
|  1|2015-11-22 10:00:...|  1|      1|    91|        90|
|  1|2015-11-22 10:10:...|  2|      3|    91|        88|
|  1|2015-11-22 10:20:...|  3|      6|    91|        85|
|  1|2015-11-22 10:30:...|  4|     10|    91|        81|
|  1|2015-11-22 10:40:...|  5|     15|    91|        76|
|  1|2015-11-22 10:50:...|  6|     21|    91|        70|
|  1|2015-11-22 11:00:...|  7|     28|    91|        63|
|  1|2015-11-22 11:10:...|  8|     36|    91|        55|
|  1|2015-11-22 11:20:...|  9|     45|    91|        46|
|  1|2015-11-22 11:30:...| 10|     55|    91|        36|
|  1|2015-11-22 11:40:...| 11|     66|    91|        25|
|  1|2015-11-22 11:50:...| 12|     78|    91|        13|
|  1|2015-11-22 12:00:...| 13|     91|    91|         0|
|  2|2015-11-22 10:00:...|  1|      1|    91|        90|
|  2|2015-11-22 10:10:...|  2|      3|    91|        88|
|  2|2015-11-22 10:20:...|  3|      6|    91|        85|
|  2|2015-11-22 10:30:...|  4|     10|    91|        81|
|  2|2015-11-22 10:40:...|  5|     15|    91|        76|
|  2|2015-11-22 10:50:...|  6|     21|    91|        70|
|  2|2015-11-22 11:00:...|  7|     28|    91|        63|
+---+--------------------+---+-------+------+----------+
only showing top 20 rows

Ergänzung

Wie ich diesmal bemerkt habe, scheint die Verwendung von SPARK_CLASSPATH in Spark 1.0 und höher nicht empfohlen zu werden. Als ich mit pyspark anfing, bekam ich die folgende Nachricht.

15/11/22 12:32:44 WARN spark.SparkConf: 
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1202.jdbc41.jar').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath
  

Bei Verwendung eines Clusters wird diese Umgebungsvariable anscheinend nicht korrekt auf verschiedenen Servern übertragen. Daher wird empfohlen, einen anderen Parameter zu verwenden.

Umm. Ich muss den Unterschied zwischen lokaler und verteilter Umgebung verstehen.

Recommended Posts

So ermitteln Sie die kumulative Summe für jede Gruppe mithilfe von DataFrame in Spark [Python-Version]
So erhalten Sie die Python-Version
So stellen Sie die Ausgabeauflösung für jeden Keyframe in Blender ein
[Einführung in Python] Wie verwende ich den Operator in in der for-Anweisung?
So überprüfen Sie die Version von opencv mit Python
Wechseln Sie das zu ladende Modul für jede Ausführungsumgebung in Python
Passen Sie die Verteilung jeder Gruppe in Python an
So geben Sie die TLS-Version in Python-Anforderungen an
So finden Sie die Korrelation für kategoriale Variablen
[Circuit x Python] So ermitteln Sie die Übertragungsfunktion eines Schaltkreises mit Lcapy
So ermitteln Sie den Koeffizienten der ungefähren Kurve, die in Python durch die Scheitelpunkte verläuft
Finden Sie die kumulative Verteilungsfunktion durch Sortieren (Python-Version)
So rufen Sie den n-ten größten Wert in Python ab
[Für Anfänger] Wie man den Befehl say mit Python benutzt!
So erhalten Sie den Variablennamen selbst in Python
So ermitteln Sie die Anzahl der Stellen in Python
Wie Sie das aktuelle Verzeichnis in Python in Blender kennen
So senden Sie Microsoft Forms automatisch mit Python (Mac-Version)
Beenden bei Verwendung von Python in Terminal (Mac)
So rufen Sie mehrere Arrays mit Slice in Python ab.
[Einführung in Python] Wie stoppe ich die Schleife mit break?
So führen Sie einen Befehl mit einem Unterprozess in Python aus
[Python] So geben Sie Listenwerte der Reihe nach aus
Konvertierung des gesamten Typs für jede Datenrahmenspalte von Python
[Einführung in Python] So schreiben Sie sich wiederholende Anweisungen mit for-Anweisungen
Ich möchte nur das 95% -Konfidenzintervall des Unterschieds im Bevölkerungsverhältnis in Python ermitteln
So testen Sie eine Funktion, die die aktuelle Zeit enthält, mit Freezegun in Python
Geben Sie für jede Datei die angegebene Tabelle der Oracle-Datenbank in Python in Excel aus
So zählen Sie die Anzahl der Vorkommen jedes Elements in der Liste in Python mit der Gewichtung
So legen Sie die Entwicklungsumgebung für jedes Projekt mit VSCode + Python-Erweiterung + Miniconda fest
So finden Sie das erste Element, das den Kriterien in der Python-Liste entspricht
So finden Sie die optimale Anzahl von Clustern für k-means
[Python] So überprüfen Sie, ob der Schlüssel im Wörterbuch vorhanden ist
Konvertieren Sie mit Python für .NET von Pandas DataFrame in System.Data.DataTable
Überprüfen Sie die Funktionsweise von Python für .NET in jeder Umgebung
[Python] Verwendung von Matplotlib, einer Bibliothek zum Zeichnen von Diagrammen
Finde Fehler in Python
Verwendung der Methode __call__ in der Python-Klasse
So definieren Sie mehrere Variablen in einer Python for-Anweisung
Festlegen der Cache-Steuerung für den BLOB-Speicher in Azure Storage in Python
So generieren Sie eine Abfrage mit dem IN-Operator in Django
So erhalten Sie den letzten (letzten) Wert in einer Liste in Python
So implementieren Sie Python EXE für Windows mit Docker-Container
Ich wusste nicht, wie ich die [Python] für die Anweisung verwenden sollte
Finden Sie heraus, wie viele Zeichen sich in der Zeichenfolge befinden.
So ändern Sie die Python-Version
Wie man in Python entwickelt
Übergeben des Ausführungsergebnisses eines Shell-Befehls in einer Liste in Python (nicht blockierende Version)
So führen Sie den Befehl sed mit der for-Anweisung mehrmals aus
Dinge, auf die Sie achten müssen, wenn Sie Standardargumente in Python verwenden
So bestimmen Sie die Existenz eines Selenelements in Python
So ändern Sie die Protokollstufe von Azure SDK für Python
Wie Sie die interne Struktur eines Objekts in Python kennen
python / pandas / dataframe / So erhalten Sie die einfachste Zeile / Spalte / Index / Spalte
Das 15. Offline-Echtzeit-Schreiben eines Referenzproblems in Python
So erhalten Sie mithilfe der Mastodon-API Follower und Follower von Python
17. Offline-Echtzeit So lösen Sie Schreibprobleme mit Python
So überprüfen Sie die Speichergröße einer Variablen in Python
Ändern Sie automatisch die Größe von Screenshots für den App Store für jeden Bildschirm in Python
So stellen Sie fest, dass in Python3 ein Kreuzschlüssel eingegeben wurde