Il s'agit d'une méthode pour calculer la somme cumulée lors du regroupement et du tri des colonnes à l'aide de la fonction Window de la version python de Spark DataFrame.
C'est une méthode que j'ai recherchée en consultant la documentation officielle de l'API Python, il existe donc peut-être un meilleur moyen. La version de Spark utilisée est la 1.5.2.
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html
Préparez les données de test dans la table PostgreSQL et chargez-les dans pyspark en tant que DataFrame.
$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar PYSPARK_DRIVER_PYTHON=ipython pyspark
(..snip..)
In [1]: df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres?user=postgres', dbtable='public.foo').load()
(..snip..)
In [2]: df.printSchema()
root
|-- a: integer (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: integer (nullable = true)
In [4]: df.show()
+---+--------------------+---+
| a| b| c|
+---+--------------------+---+
| 1|2015-11-22 10:00:...| 1|
| 1|2015-11-22 10:10:...| 2|
| 1|2015-11-22 10:20:...| 3|
| 1|2015-11-22 10:30:...| 4|
| 1|2015-11-22 10:40:...| 5|
| 1|2015-11-22 10:50:...| 6|
| 1|2015-11-22 11:00:...| 7|
| 1|2015-11-22 11:10:...| 8|
| 1|2015-11-22 11:20:...| 9|
| 1|2015-11-22 11:30:...| 10|
| 1|2015-11-22 11:40:...| 11|
| 1|2015-11-22 11:50:...| 12|
| 1|2015-11-22 12:00:...| 13|
| 2|2015-11-22 10:00:...| 1|
| 2|2015-11-22 10:10:...| 2|
| 2|2015-11-22 10:20:...| 3|
| 2|2015-11-22 10:30:...| 4|
| 2|2015-11-22 10:40:...| 5|
| 2|2015-11-22 10:50:...| 6|
| 2|2015-11-22 11:00:...| 7|
+---+--------------------+---+
only showing top 20 rows
La colonne a est pour le regroupement, la colonne b pour le tri et la colonne c pour le calcul.
Lors du regroupement par colonne a, triez par colonne b et prenez la somme cumulée de la colonne c.
Tout d'abord, la définition de Window
In [6]: from pyspark.sql.Window import Window
In [7]: from pyspark.sql import functions as func
In [8]: window = Window.partitionpartitionBy(df.a).orderBy(df.b).rangeBetween(-sys.maxsize,0)
In [9]: window
Out[9]: <pyspark.sql.window.WindowSpec at 0x18368d0>
Créez une colonne qui calcule pyspark.sql.functions.sum () sur cette fenêtre
In [10]: cum_c = func.sum(df.c).over(window)
In [11]: cum_c
Out[11]: Column<'sum(c) WindowSpecDefinition UnspecifiedFrame>
Créez un nouveau DataFrame en attachant cette colonne au DataFrame d'origine
In [12]: mod_df = df.withColumn("cum_c", cum_c)
In [13]: mod_df
Out[13]: DataFrame[a: int, b: timestamp, c: int, cum_c: bigint]
In [14]: mod_df.printSchema()
root
|-- a: integer (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: integer (nullable = true)
|-- cum_c: long (nullable = true)
In [15]: mod_df.show()
+---+--------------------+---+-----+
| a| b| c|cum_c|
+---+--------------------+---+-----+
| 1|2015-11-22 10:00:...| 1| 1|
| 1|2015-11-22 10:10:...| 2| 3|
| 1|2015-11-22 10:20:...| 3| 6|
| 1|2015-11-22 10:30:...| 4| 10|
| 1|2015-11-22 10:40:...| 5| 15|
| 1|2015-11-22 10:50:...| 6| 21|
| 1|2015-11-22 11:00:...| 7| 28|
| 1|2015-11-22 11:10:...| 8| 36|
| 1|2015-11-22 11:20:...| 9| 45|
| 1|2015-11-22 11:30:...| 10| 55|
| 1|2015-11-22 11:40:...| 11| 66|
| 1|2015-11-22 11:50:...| 12| 78|
| 1|2015-11-22 12:00:...| 13| 91|
| 2|2015-11-22 10:00:...| 1| 1|
| 2|2015-11-22 10:10:...| 2| 3|
| 2|2015-11-22 10:20:...| 3| 6|
| 2|2015-11-22 10:30:...| 4| 10|
| 2|2015-11-22 10:40:...| 5| 15|
| 2|2015-11-22 10:50:...| 6| 21|
| 2|2015-11-22 11:00:...| 7| 28|
+---+--------------------+---+-----+
only showing top 20 rows
Vous pouvez le calculer.
Calculez maintenant la somme de la colonne c pour chaque groupe de la colonne a. Définissez le DataFrame sur pyspark.sql.GroupedData avec groupBy () et utilisez pyspark.sql.GroupedData.sum (). C'est compliqué avec sum (), mais soyez prudent car si vous avez l'option Colonne comme argument, vous obtiendrez une erreur.
In [25]: sum_c_df = df.groupBy('a').sum('c')
De plus, contrairement à la précédente, il ne s'agit pas d'une fonction Window, le résultat renvoyé est donc un DataFrame. De plus, le nom de la colonne qui stocke la somme est décidé arbitrairement.
In [26]: sum_c_df
Out[26]: DataFrame[a: int, sum(c): bigint]
Eh bien, c'est compliqué.
Pour le moment, attachez-le en tant que colonne au DataFrame d'origine.
In [27]: mod_df3 = mod_df2.join('a'sum_c_df, 'a'()
In [28]: mod_df3.printSchema()
root
|-- a: integer (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: integer (nullable = true)
|-- cum_c: long (nullable = true)
|-- sum(c): long (nullable = true)
In [29]: mod_df3.show()
(..snip..)
+---+--------------------+---+-------+------+
| a| b| c| cum_c|sum(c)|
+---+--------------------+---+-------+------+
| 1|2015-11-22 10:00:...| 1| 1| 91|
| 1|2015-11-22 10:10:...| 2| 3| 91|
| 1|2015-11-22 10:20:...| 3| 6| 91|
| 1|2015-11-22 10:30:...| 4| 10| 91|
| 1|2015-11-22 10:40:...| 5| 15| 91|
| 1|2015-11-22 10:50:...| 6| 21| 91|
| 1|2015-11-22 11:00:...| 7| 28| 91|
| 1|2015-11-22 11:10:...| 8| 36| 91|
| 1|2015-11-22 11:20:...| 9| 45| 91|
| 1|2015-11-22 11:30:...| 10| 55| 91|
| 1|2015-11-22 11:40:...| 11| 66| 91|
| 1|2015-11-22 11:50:...| 12| 78| 91|
| 1|2015-11-22 12:00:...| 13| 91| 91|
| 2|2015-11-22 10:00:...| 1| 1| 91|
| 2|2015-11-22 10:10:...| 2| 3| 91|
| 2|2015-11-22 10:20:...| 3| 6| 91|
| 2|2015-11-22 10:30:...| 4| 10| 91|
| 2|2015-11-22 10:40:...| 5| 15| 91|
| 2|2015-11-22 10:50:...| 6| 21| 91|
| 2|2015-11-22 11:00:...| 7| 28| 91|
+---+--------------------+---+-------+------+
only showing top 20 rows
Vous avez calculé avec succès le total pour chaque groupe.
Calculons maintenant la valeur restante jusqu'à la somme de la colonne c. Autrement dit, somme totale cumulée.
In [30]: diff_sum_c = mod_df3[('sum(c)'] - mod_df3['cum_c']
In [31]: mod_df4 = mod_df3.withColumn("diff_sum_c", diff_sum_c)
In [34]: mod_df4.show()
(..snip..)
+---+--------------------+---+-------+------+----------+
| a| b| c|cum_c_2|sum(c)|diff_sum_c|
+---+--------------------+---+-------+------+----------+
| 1|2015-11-22 10:00:...| 1| 1| 91| 90|
| 1|2015-11-22 10:10:...| 2| 3| 91| 88|
| 1|2015-11-22 10:20:...| 3| 6| 91| 85|
| 1|2015-11-22 10:30:...| 4| 10| 91| 81|
| 1|2015-11-22 10:40:...| 5| 15| 91| 76|
| 1|2015-11-22 10:50:...| 6| 21| 91| 70|
| 1|2015-11-22 11:00:...| 7| 28| 91| 63|
| 1|2015-11-22 11:10:...| 8| 36| 91| 55|
| 1|2015-11-22 11:20:...| 9| 45| 91| 46|
| 1|2015-11-22 11:30:...| 10| 55| 91| 36|
| 1|2015-11-22 11:40:...| 11| 66| 91| 25|
| 1|2015-11-22 11:50:...| 12| 78| 91| 13|
| 1|2015-11-22 12:00:...| 13| 91| 91| 0|
| 2|2015-11-22 10:00:...| 1| 1| 91| 90|
| 2|2015-11-22 10:10:...| 2| 3| 91| 88|
| 2|2015-11-22 10:20:...| 3| 6| 91| 85|
| 2|2015-11-22 10:30:...| 4| 10| 91| 81|
| 2|2015-11-22 10:40:...| 5| 15| 91| 76|
| 2|2015-11-22 10:50:...| 6| 21| 91| 70|
| 2|2015-11-22 11:00:...| 7| 28| 91| 63|
+---+--------------------+---+-------+------+----------+
only showing top 20 rows
Comme je l'ai remarqué cette fois, l'utilisation de SPARK_CLASSPATH ne semble pas être recommandée dans Spark 1.0 et supérieur. Quand j'ai commencé pyspark, j'ai reçu le message suivant.
15/11/22 12:32:44 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1202.jdbc41.jar').
This is deprecated in Spark 1.0+.
Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath
Apparemment, lors de l'utilisation d'un cluster, cette variable d'environnement n'est pas transmise correctement sur différents serveurs, il semble donc recommandé d'utiliser un paramètre différent.
Umm. Je dois comprendre la différence entre l'environnement local et distribué.
Recommended Posts