Dies ist eine Methode zum Berechnen der kumulierten Summe beim Gruppieren und Sortieren von Spalten mithilfe der Fensterfunktion von Spark's Python-Version DataFrame.
Es ist eine Methode, nach der ich gesucht habe, als ich mir die offizielle Python-API-Dokumentation angesehen habe. Es gibt also möglicherweise einen besseren Weg. Die verwendete Version von Spark ist 1.5.2.
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html
Bereiten Sie die Testdaten in der PostgreSQL-Tabelle vor und laden Sie sie als DataFrame in pyspark.
$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar PYSPARK_DRIVER_PYTHON=ipython pyspark
(..snip..)
In [1]: df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres?user=postgres', dbtable='public.foo').load()
(..snip..)
In [2]: df.printSchema()
root
|-- a: integer (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: integer (nullable = true)
In [4]: df.show()
+---+--------------------+---+
| a| b| c|
+---+--------------------+---+
| 1|2015-11-22 10:00:...| 1|
| 1|2015-11-22 10:10:...| 2|
| 1|2015-11-22 10:20:...| 3|
| 1|2015-11-22 10:30:...| 4|
| 1|2015-11-22 10:40:...| 5|
| 1|2015-11-22 10:50:...| 6|
| 1|2015-11-22 11:00:...| 7|
| 1|2015-11-22 11:10:...| 8|
| 1|2015-11-22 11:20:...| 9|
| 1|2015-11-22 11:30:...| 10|
| 1|2015-11-22 11:40:...| 11|
| 1|2015-11-22 11:50:...| 12|
| 1|2015-11-22 12:00:...| 13|
| 2|2015-11-22 10:00:...| 1|
| 2|2015-11-22 10:10:...| 2|
| 2|2015-11-22 10:20:...| 3|
| 2|2015-11-22 10:30:...| 4|
| 2|2015-11-22 10:40:...| 5|
| 2|2015-11-22 10:50:...| 6|
| 2|2015-11-22 11:00:...| 7|
+---+--------------------+---+
only showing top 20 rows
Spalte a dient zur Gruppierung, Spalte b zur Sortierung und Spalte c zur Berechnung.
Sortieren Sie beim Gruppieren nach Spalte a nach Spalte b und nehmen Sie die kumulative Summe von Spalte c.
Erstens die Definition von Fenster
In [6]: from pyspark.sql.Window import Window
In [7]: from pyspark.sql import functions as func
In [8]: window = Window.partitionpartitionBy(df.a).orderBy(df.b).rangeBetween(-sys.maxsize,0)
In [9]: window
Out[9]: <pyspark.sql.window.WindowSpec at 0x18368d0>
Erstellen Sie in diesem Fenster eine Spalte, die pyspark.sql.functions.sum () berechnet
In [10]: cum_c = func.sum(df.c).over(window)
In [11]: cum_c
Out[11]: Column<'sum(c) WindowSpecDefinition UnspecifiedFrame>
Erstellen Sie einen neuen DataFrame, indem Sie diese Spalte an den ursprünglichen DataFrame anhängen
In [12]: mod_df = df.withColumn("cum_c", cum_c)
In [13]: mod_df
Out[13]: DataFrame[a: int, b: timestamp, c: int, cum_c: bigint]
In [14]: mod_df.printSchema()
root
|-- a: integer (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: integer (nullable = true)
|-- cum_c: long (nullable = true)
In [15]: mod_df.show()
+---+--------------------+---+-----+
| a| b| c|cum_c|
+---+--------------------+---+-----+
| 1|2015-11-22 10:00:...| 1| 1|
| 1|2015-11-22 10:10:...| 2| 3|
| 1|2015-11-22 10:20:...| 3| 6|
| 1|2015-11-22 10:30:...| 4| 10|
| 1|2015-11-22 10:40:...| 5| 15|
| 1|2015-11-22 10:50:...| 6| 21|
| 1|2015-11-22 11:00:...| 7| 28|
| 1|2015-11-22 11:10:...| 8| 36|
| 1|2015-11-22 11:20:...| 9| 45|
| 1|2015-11-22 11:30:...| 10| 55|
| 1|2015-11-22 11:40:...| 11| 66|
| 1|2015-11-22 11:50:...| 12| 78|
| 1|2015-11-22 12:00:...| 13| 91|
| 2|2015-11-22 10:00:...| 1| 1|
| 2|2015-11-22 10:10:...| 2| 3|
| 2|2015-11-22 10:20:...| 3| 6|
| 2|2015-11-22 10:30:...| 4| 10|
| 2|2015-11-22 10:40:...| 5| 15|
| 2|2015-11-22 10:50:...| 6| 21|
| 2|2015-11-22 11:00:...| 7| 28|
+---+--------------------+---+-----+
only showing top 20 rows
Sie können es berechnen.
Berechnen Sie nun die Summe der Spalte c für jede Gruppe der Spalte a. Setzen Sie den DataFrame mit groupBy () auf pyspark.sql.GroupedData und verwenden Sie pyspark.sql.GroupedData.sum (). Es ist kompliziert mit sum (), aber seien Sie vorsichtig, denn wenn Sie die Option Column als Argument haben, erhalten Sie eine Fehlermeldung.
In [25]: sum_c_df = df.groupBy('a').sum('c')
Im Gegensatz zu zuvor ist dies keine Fensterfunktion, sodass das zurückgegebene Ergebnis ein DataFrame ist. Darüber hinaus wird der Spaltenname, in dem die Summe gespeichert ist, willkürlich festgelegt.
In [26]: sum_c_df
Out[26]: DataFrame[a: int, sum(c): bigint]
Nun, es ist kompliziert.
Fügen Sie es vorerst als Spalte an den ursprünglichen DataFrame an.
In [27]: mod_df3 = mod_df2.join('a'sum_c_df, 'a'()
In [28]: mod_df3.printSchema()
root
|-- a: integer (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: integer (nullable = true)
|-- cum_c: long (nullable = true)
|-- sum(c): long (nullable = true)
In [29]: mod_df3.show()
(..snip..)
+---+--------------------+---+-------+------+
| a| b| c| cum_c|sum(c)|
+---+--------------------+---+-------+------+
| 1|2015-11-22 10:00:...| 1| 1| 91|
| 1|2015-11-22 10:10:...| 2| 3| 91|
| 1|2015-11-22 10:20:...| 3| 6| 91|
| 1|2015-11-22 10:30:...| 4| 10| 91|
| 1|2015-11-22 10:40:...| 5| 15| 91|
| 1|2015-11-22 10:50:...| 6| 21| 91|
| 1|2015-11-22 11:00:...| 7| 28| 91|
| 1|2015-11-22 11:10:...| 8| 36| 91|
| 1|2015-11-22 11:20:...| 9| 45| 91|
| 1|2015-11-22 11:30:...| 10| 55| 91|
| 1|2015-11-22 11:40:...| 11| 66| 91|
| 1|2015-11-22 11:50:...| 12| 78| 91|
| 1|2015-11-22 12:00:...| 13| 91| 91|
| 2|2015-11-22 10:00:...| 1| 1| 91|
| 2|2015-11-22 10:10:...| 2| 3| 91|
| 2|2015-11-22 10:20:...| 3| 6| 91|
| 2|2015-11-22 10:30:...| 4| 10| 91|
| 2|2015-11-22 10:40:...| 5| 15| 91|
| 2|2015-11-22 10:50:...| 6| 21| 91|
| 2|2015-11-22 11:00:...| 7| 28| 91|
+---+--------------------+---+-------+------+
only showing top 20 rows
Sie haben die Summe für jede Gruppe erfolgreich berechnet.
Berechnen wir nun den verbleibenden Wert bis zur Summe für Spalte c. Das heißt, die Gesamtsumme.
In [30]: diff_sum_c = mod_df3[('sum(c)'] - mod_df3['cum_c']
In [31]: mod_df4 = mod_df3.withColumn("diff_sum_c", diff_sum_c)
In [34]: mod_df4.show()
(..snip..)
+---+--------------------+---+-------+------+----------+
| a| b| c|cum_c_2|sum(c)|diff_sum_c|
+---+--------------------+---+-------+------+----------+
| 1|2015-11-22 10:00:...| 1| 1| 91| 90|
| 1|2015-11-22 10:10:...| 2| 3| 91| 88|
| 1|2015-11-22 10:20:...| 3| 6| 91| 85|
| 1|2015-11-22 10:30:...| 4| 10| 91| 81|
| 1|2015-11-22 10:40:...| 5| 15| 91| 76|
| 1|2015-11-22 10:50:...| 6| 21| 91| 70|
| 1|2015-11-22 11:00:...| 7| 28| 91| 63|
| 1|2015-11-22 11:10:...| 8| 36| 91| 55|
| 1|2015-11-22 11:20:...| 9| 45| 91| 46|
| 1|2015-11-22 11:30:...| 10| 55| 91| 36|
| 1|2015-11-22 11:40:...| 11| 66| 91| 25|
| 1|2015-11-22 11:50:...| 12| 78| 91| 13|
| 1|2015-11-22 12:00:...| 13| 91| 91| 0|
| 2|2015-11-22 10:00:...| 1| 1| 91| 90|
| 2|2015-11-22 10:10:...| 2| 3| 91| 88|
| 2|2015-11-22 10:20:...| 3| 6| 91| 85|
| 2|2015-11-22 10:30:...| 4| 10| 91| 81|
| 2|2015-11-22 10:40:...| 5| 15| 91| 76|
| 2|2015-11-22 10:50:...| 6| 21| 91| 70|
| 2|2015-11-22 11:00:...| 7| 28| 91| 63|
+---+--------------------+---+-------+------+----------+
only showing top 20 rows
Wie ich diesmal bemerkt habe, scheint die Verwendung von SPARK_CLASSPATH in Spark 1.0 und höher nicht empfohlen zu werden. Als ich mit pyspark anfing, bekam ich die folgende Nachricht.
15/11/22 12:32:44 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1202.jdbc41.jar').
This is deprecated in Spark 1.0+.
Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath
Bei Verwendung eines Clusters wird diese Umgebungsvariable anscheinend nicht korrekt auf verschiedenen Servern übertragen. Daher wird empfohlen, einen anderen Parameter zu verwenden.
Umm. Ich muss den Unterschied zwischen lokaler und verteilter Umgebung verstehen.
Recommended Posts