Comment trouver la somme / somme cumulée pour chaque groupe à l'aide de DataFrame dans Spark [version Python]

Il s'agit d'une méthode pour calculer la somme cumulée lors du regroupement et du tri des colonnes à l'aide de la fonction Window de la version python de Spark DataFrame.

C'est une méthode que j'ai recherchée en consultant la documentation officielle de l'API Python, il existe donc peut-être un meilleur moyen. La version de Spark utilisée est la 1.5.2.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html

Exemple de données

Préparez les données de test dans la table PostgreSQL et chargez-les dans pyspark en tant que DataFrame.

$ SPARK_CLASSPATH=postgresql-9.4-1202.jdbc41.jar PYSPARK_DRIVER_PYTHON=ipython pyspark
(..snip..)
In [1]: df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres?user=postgres', dbtable='public.foo').load()
(..snip..)
In [2]: df.printSchema()
root
 |-- a: integer (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: integer (nullable = true)
In [4]: df.show()
+---+--------------------+---+
|  a|                   b|  c|
+---+--------------------+---+
|  1|2015-11-22 10:00:...|  1|
|  1|2015-11-22 10:10:...|  2|
|  1|2015-11-22 10:20:...|  3|
|  1|2015-11-22 10:30:...|  4|
|  1|2015-11-22 10:40:...|  5|
|  1|2015-11-22 10:50:...|  6|
|  1|2015-11-22 11:00:...|  7|
|  1|2015-11-22 11:10:...|  8|
|  1|2015-11-22 11:20:...|  9|
|  1|2015-11-22 11:30:...| 10|
|  1|2015-11-22 11:40:...| 11|
|  1|2015-11-22 11:50:...| 12|
|  1|2015-11-22 12:00:...| 13|
|  2|2015-11-22 10:00:...|  1|
|  2|2015-11-22 10:10:...|  2|
|  2|2015-11-22 10:20:...|  3|
|  2|2015-11-22 10:30:...|  4|
|  2|2015-11-22 10:40:...|  5|
|  2|2015-11-22 10:50:...|  6|
|  2|2015-11-22 11:00:...|  7|
+---+--------------------+---+
only showing top 20 rows

La colonne a est pour le regroupement, la colonne b pour le tri et la colonne c pour le calcul.

Somme cumulée pour chaque groupe de colonnes

Lors du regroupement par colonne a, triez par colonne b et prenez la somme cumulée de la colonne c.

Tout d'abord, la définition de Window

In [6]: from pyspark.sql.Window import Window

In [7]: from pyspark.sql import functions as func

In [8]: window = Window.partitionpartitionBy(df.a).orderBy(df.b).rangeBetween(-sys.maxsize,0)

In [9]: window
Out[9]: <pyspark.sql.window.WindowSpec at 0x18368d0>

Créez une colonne qui calcule pyspark.sql.functions.sum () sur cette fenêtre

In [10]: cum_c = func.sum(df.c).over(window)

In [11]: cum_c
Out[11]: Column<'sum(c) WindowSpecDefinition UnspecifiedFrame>

Créez un nouveau DataFrame en attachant cette colonne au DataFrame d'origine

In [12]: mod_df = df.withColumn("cum_c", cum_c)

In [13]: mod_df
Out[13]: DataFrame[a: int, b: timestamp, c: int, cum_c: bigint]

In [14]: mod_df.printSchema()
root
 |-- a: integer (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: integer (nullable = true)
 |-- cum_c: long (nullable = true)


In [15]: mod_df.show()
+---+--------------------+---+-----+
|  a|                   b|  c|cum_c|
+---+--------------------+---+-----+
|  1|2015-11-22 10:00:...|  1|    1|
|  1|2015-11-22 10:10:...|  2|    3|
|  1|2015-11-22 10:20:...|  3|    6|
|  1|2015-11-22 10:30:...|  4|   10|
|  1|2015-11-22 10:40:...|  5|   15|
|  1|2015-11-22 10:50:...|  6|   21|
|  1|2015-11-22 11:00:...|  7|   28|
|  1|2015-11-22 11:10:...|  8|   36|
|  1|2015-11-22 11:20:...|  9|   45|
|  1|2015-11-22 11:30:...| 10|   55|
|  1|2015-11-22 11:40:...| 11|   66|
|  1|2015-11-22 11:50:...| 12|   78|
|  1|2015-11-22 12:00:...| 13|   91|
|  2|2015-11-22 10:00:...|  1|    1|
|  2|2015-11-22 10:10:...|  2|    3|
|  2|2015-11-22 10:20:...|  3|    6|
|  2|2015-11-22 10:30:...|  4|   10|
|  2|2015-11-22 10:40:...|  5|   15|
|  2|2015-11-22 10:50:...|  6|   21|
|  2|2015-11-22 11:00:...|  7|   28|
+---+--------------------+---+-----+
only showing top 20 rows

Vous pouvez le calculer.

Somme de chaque groupe de colonnes

Calculez maintenant la somme de la colonne c pour chaque groupe de la colonne a. Définissez le DataFrame sur pyspark.sql.GroupedData avec groupBy () et utilisez pyspark.sql.GroupedData.sum (). C'est compliqué avec sum (), mais soyez prudent car si vous avez l'option Colonne comme argument, vous obtiendrez une erreur.

In [25]: sum_c_df = df.groupBy('a').sum('c')

De plus, contrairement à la précédente, il ne s'agit pas d'une fonction Window, le résultat renvoyé est donc un DataFrame. De plus, le nom de la colonne qui stocke la somme est décidé arbitrairement.

In [26]: sum_c_df
Out[26]: DataFrame[a: int, sum(c): bigint]

Eh bien, c'est compliqué.

Pour le moment, attachez-le en tant que colonne au DataFrame d'origine.

In [27]: mod_df3 = mod_df2.join('a'sum_c_df, 'a'()

In [28]: mod_df3.printSchema()
root
 |-- a: integer (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: integer (nullable = true)
 |-- cum_c: long (nullable = true)
 |-- sum(c): long (nullable = true)


In [29]: mod_df3.show()
(..snip..)
+---+--------------------+---+-------+------+
|  a|                   b|  c|  cum_c|sum(c)|
+---+--------------------+---+-------+------+
|  1|2015-11-22 10:00:...|  1|      1|    91|
|  1|2015-11-22 10:10:...|  2|      3|    91|
|  1|2015-11-22 10:20:...|  3|      6|    91|
|  1|2015-11-22 10:30:...|  4|     10|    91|
|  1|2015-11-22 10:40:...|  5|     15|    91|
|  1|2015-11-22 10:50:...|  6|     21|    91|
|  1|2015-11-22 11:00:...|  7|     28|    91|
|  1|2015-11-22 11:10:...|  8|     36|    91|
|  1|2015-11-22 11:20:...|  9|     45|    91|
|  1|2015-11-22 11:30:...| 10|     55|    91|
|  1|2015-11-22 11:40:...| 11|     66|    91|
|  1|2015-11-22 11:50:...| 12|     78|    91|
|  1|2015-11-22 12:00:...| 13|     91|    91|
|  2|2015-11-22 10:00:...|  1|      1|    91|
|  2|2015-11-22 10:10:...|  2|      3|    91|
|  2|2015-11-22 10:20:...|  3|      6|    91|
|  2|2015-11-22 10:30:...|  4|     10|    91|
|  2|2015-11-22 10:40:...|  5|     15|    91|
|  2|2015-11-22 10:50:...|  6|     21|    91|
|  2|2015-11-22 11:00:...|  7|     28|    91|
+---+--------------------+---+-------+------+
only showing top 20 rows

Vous avez calculé avec succès le total pour chaque groupe.

(Somme totale cumulée) pour chaque groupe de colonnes

Calculons maintenant la valeur restante jusqu'à la somme de la colonne c. Autrement dit, somme totale cumulée.

In [30]: diff_sum_c = mod_df3[('sum(c)'] - mod_df3['cum_c']

In [31]: mod_df4 = mod_df3.withColumn("diff_sum_c", diff_sum_c)

In [34]: mod_df4.show()
(..snip..)
+---+--------------------+---+-------+------+----------+
|  a|                   b|  c|cum_c_2|sum(c)|diff_sum_c|
+---+--------------------+---+-------+------+----------+
|  1|2015-11-22 10:00:...|  1|      1|    91|        90|
|  1|2015-11-22 10:10:...|  2|      3|    91|        88|
|  1|2015-11-22 10:20:...|  3|      6|    91|        85|
|  1|2015-11-22 10:30:...|  4|     10|    91|        81|
|  1|2015-11-22 10:40:...|  5|     15|    91|        76|
|  1|2015-11-22 10:50:...|  6|     21|    91|        70|
|  1|2015-11-22 11:00:...|  7|     28|    91|        63|
|  1|2015-11-22 11:10:...|  8|     36|    91|        55|
|  1|2015-11-22 11:20:...|  9|     45|    91|        46|
|  1|2015-11-22 11:30:...| 10|     55|    91|        36|
|  1|2015-11-22 11:40:...| 11|     66|    91|        25|
|  1|2015-11-22 11:50:...| 12|     78|    91|        13|
|  1|2015-11-22 12:00:...| 13|     91|    91|         0|
|  2|2015-11-22 10:00:...|  1|      1|    91|        90|
|  2|2015-11-22 10:10:...|  2|      3|    91|        88|
|  2|2015-11-22 10:20:...|  3|      6|    91|        85|
|  2|2015-11-22 10:30:...|  4|     10|    91|        81|
|  2|2015-11-22 10:40:...|  5|     15|    91|        76|
|  2|2015-11-22 10:50:...|  6|     21|    91|        70|
|  2|2015-11-22 11:00:...|  7|     28|    91|        63|
+---+--------------------+---+-------+------+----------+
only showing top 20 rows

Supplément

Comme je l'ai remarqué cette fois, l'utilisation de SPARK_CLASSPATH ne semble pas être recommandée dans Spark 1.0 et supérieur. Quand j'ai commencé pyspark, j'ai reçu le message suivant.

15/11/22 12:32:44 WARN spark.SparkConf: 
SPARK_CLASSPATH was detected (set to 'postgresql-9.4-1202.jdbc41.jar').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath
  

Apparemment, lors de l'utilisation d'un cluster, cette variable d'environnement n'est pas transmise correctement sur différents serveurs, il semble donc recommandé d'utiliser un paramètre différent.

Umm. Je dois comprendre la différence entre l'environnement local et distribué.

Recommended Posts

Comment trouver la somme / somme cumulée pour chaque groupe à l'aide de DataFrame dans Spark [version Python]
Comment obtenir la version Python
Comment définir la résolution de sortie pour chaque image clé dans Blender
[Introduction à Python] Comment utiliser l'opérateur in dans l'instruction for?
Comment vérifier la version d'opencv avec python
Changer le module à charger pour chaque environnement d'exécution en Python
Faites correspondre la distribution de chaque groupe en Python
Comment spécifier la version TLS dans les requêtes python
Comment trouver la corrélation pour les variables catégorielles
[Circuit x Python] Comment trouver la fonction de transfert d'un circuit en utilisant Lcapy
Comment trouver le coefficient de la courbe approximative passant par les sommets en Python
Trouver la fonction de distribution cumulative par tri (version Python)
Comment récupérer la nième plus grande valeur en Python
[Pour les débutants] Comment utiliser la commande say avec python!
Comment obtenir le nom de la variable lui-même en python
Comment obtenir le nombre de chiffres en Python
Comment connaître le répertoire actuel en Python dans Blender
Comment soumettre automatiquement des formulaires Microsoft à l'aide de python (version Mac)
Comment quitter lors de l'utilisation de Python dans Terminal (Mac)
Comment récupérer plusieurs tableaux à l'aide de slice en python.
[Introduction à Python] Comment arrêter la boucle en utilisant break?
Comment exécuter une commande à l'aide d'un sous-processus en Python
[Python] Comment afficher les valeurs de liste dans l'ordre
Conversion de type entier pour chaque colonne de dataframe en python
[Introduction à Python] Comment écrire des instructions répétitives à l'aide d'instructions for
Je veux juste trouver l'intervalle de confiance à 95% de la différence de ratio de population en Python
Comment tester unitaire une fonction contenant l'heure actuelle à l'aide de Freezegun en Python
Sortie de la table spécifiée de la base de données Oracle en Python vers Excel pour chaque fichier
Comment compter le nombre d'occurrences de chaque élément de la liste en Python avec poids
Comment définir l'environnement de développement pour chaque projet avec VSCode + extension Python + Miniconda
Comment trouver le premier élément qui correspond aux critères de la liste Python
Comment trouver le nombre optimal de clusters pour les k-moyennes
[python] Comment vérifier si la clé existe dans le dictionnaire
Conversion de Pandas DataFrame en System.Data.DataTable à l'aide de Python pour .NET
Vérifiez le fonctionnement de Python pour .NET dans chaque environnement
[python] Comment utiliser Matplotlib, une bibliothèque pour dessiner des graphiques
Trouver des erreurs en Python
Comment utiliser la méthode __call__ dans la classe Python
Comment définir plusieurs variables dans une instruction Python for
Comment spécifier Cache-Control pour le stockage BLOB dans le stockage Azure en Python
Comment générer une requête à l'aide de l'opérateur IN dans Django
Comment obtenir la dernière (dernière) valeur d'une liste en Python
Comment implémenter Python EXE pour Windows avec le conteneur Docker
Je ne savais pas comment utiliser l'instruction [python] for
Découvrez combien de chaque caractère est dans la chaîne.
Comment changer la version de Python
Comment développer en Python
Comment passer le résultat de l'exécution d'une commande shell dans une liste en Python (version non bloquante)
Comment exécuter la commande sed plusieurs fois à l'aide de l'instruction for
Choses à surveiller lors de l'utilisation d'arguments par défaut en Python
Comment déterminer l'existence d'un élément sélénium en Python
Comment modifier le niveau de journalisation d'Azure SDK pour Python
Comment connaître la structure interne d'un objet en Python
python / pandas / dataframe / Comment obtenir la ligne / colonne / index / colonne la plus simple
Le 15e comment écrire un problème de référence en temps réel hors ligne en Python
Comment obtenir des abonnés et des abonnés de Python à l'aide de l'API Mastodon
17e comment résoudre les problèmes d'écriture en temps réel hors ligne avec Python
Comment vérifier la taille de la mémoire d'une variable en Python
Redimensionner automatiquement les captures d'écran de l'App Store pour chaque écran en Python
Comment déterminer qu'une clé croisée a été entrée dans Python3