[PYTHON] Exemple de collecte de code Spark Dataframe

Introduction: qu'est-ce que Spark Dataframe?

Depuis Spark Ver 1.3, une fonction appelée Spark Dataframe a été ajoutée. Les caractéristiques sont les suivantes.

En d'autres termes, il s'agit d'un code simple et d'un traitement plus rapide que l'écriture en RDD map ou filter. En supposant que le prétraitement des données soit effectué par RDD, il est préférable de les lire immédiatement dans Dataframe avec maji. Étant donné que les mémos de Dataframe sont dispersés, je sauvegarderai l'exemple de code lorsque j'aurai un mémorandum.

De plus, il convient de noter

Charger un exemple de journal

Utilisez le journal d'accès comme sujet. Journal d'accès (csv) utilisé dans Technical Review Company's Book, au fichier csv Cliquez ici pour Nao Rin. Le contenu de csv est le journal suivant avec 3 informations de date, User_ID, Campaign_ID

click.at	user.id	campaign.id
2015/4/27 20:40	144012	Campaign077
2015/4/27 0:27	24485	Campaign063
2015/4/27 0:28	24485	Campaign063
2015/4/27 0:33	24485	Campaign038

Lisez csv et faites-le RDD. Supprimez l'en-tête de la première ligne et lisez la première colonne en tant qu'objet datetime.

import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

if not os.path.exists("./click_data_sample.csv"):
    print "csv file not found at master node, will download and copy to HDFS"
    commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
    commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")

whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
            .map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])

whole_log.take(3)
#[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
# [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
# [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]

Comment créer un Dataframe

** Créé à partir de RDD **

Dataframe peut être créé avec sqlContext.createDataFrame (my_rdd, my_schema) en spécifiant le nom de la colonne et chaque type (TimestampType, ʻIntegerType, StringType`, etc.) s'il existe un RDD original. Je vais. Voir ici pour la définition de Schéma.

printSchema (), dtypes sont les informations du schéma, count () ʻest le nombre de lignes, et show (n) ʻest les n premiers enregistrements.

fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)

whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)

#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
** Créé directement à partir du fichier csv **

Pour convertir les données lues à partir de csv en un Dataframe tel quel, utilisez spark-csv, qui est l'un des Spark Package. Il est plus facile d'utiliser databricks / spark-csv). Sauf indication contraire, tous sont lus comme des chaînes, mais si vous spécifiez ʻinfer Schema`, ce sera une bonne analogie.

whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)

#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#|           click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55|  24485|Campaign063|
#|2015-04-27 00:28:13|  24485|Campaign063|
#|2015-04-27 00:33:42|  24485|Campaign038|
#|2015-04-27 01:00:04|  24485|Campaign063|

whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_3.printSchema()

#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)

En passant, il est gênant d'avoir . dans le nom de la colonne, vous pouvez donc le renommer avec withColumnRenamed (vous pouvez créer un autre Dataframe renommé).

whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
                 .withColumnRenamed("user.id", "userID")\
                 .withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()

#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
** Créé directement depuis json **

Utilisez sqlContext.read.json pour convertir les données lues à partir du fichier json en un Dataframe tel quel. Traitez chaque ligne de fichier comme un objet json 1, s'il existe une clé qui n'existe pas, «null» sera entré.

# test_json.json contains following 3 lines, last line doesn't have "campaignID" key
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}

df_json = sqlContext.read.json("/user/hadoop/test_json.json")
df_json.printSchema()
df_json.show(5)

#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#|        access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55|       null| 24485|
#+-------------------+-----------+------+
** Créé directement à partir du parquet **

Utilisez sqlContext.read.parquet pour convertir les données lues à partir du fichier parquet en un Dataframe tel quel. Si vous spécifiez le dossier dans lequel se trouve le fichier parquet, les fichiers parquet sous ce dossier seront lus dans un lot.

sqlContext.read.parquet("/user/hadoop/parquet_folder/")

Requête avec instruction SQL

Ceci est un exemple qui interroge le Dataframe avec des instructions SQL. Si vous donnez le nom de table SQL à Dataframe avec registerTempTable, vous pouvez y faire référence en tant que nom de table SQL. La valeur de retour de sqlContext.sql (instruction SQL) est également un Dataframe.

Il est possible de décrire la sous-requête, mais sachez que si vous n'ajoutez pas d'alias au côté de la sous-requête, une erreur de syntaxe se produira pour une raison quelconque.

#Requête SQL simple

whole_log_df.registerTempTable("whole_log_table")

print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+


#Lors de la mise en place de variables dans des instructions SQL
for count in range(1, 3):
    print "Campaign00" + str(count)
    print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()

#Campaign001
#+----------+
#|access_num|
#+----------+
#|      2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#|      1674|
#+----------+

#Pour la sous-requête:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#|       20480|
#+------------+

Recherche conditionnelle avec ** filtre **, ** sélectionnez **

Il s'agit d'une fonction de recherche simple pour Dataframe. L'instruction SQL ci-dessus est similaire en fonction à Query, mais "filter" et "select" sont positionnés comme des fonctions de recherche simples. filter extrait les lignes qui remplissent les conditions et select extrait les colonnes. Notez que la grammaire est légèrement différente du «filtre» RDD.

#Sample for filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+

#Sample for select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#|         access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+

Agréger par ** groupBy **

groupBy fournit des fonctionnalités similaires à reductionByKey de RDD, mais groupBy est la méthode ici. En appelant .sql.html # pyspark.sql.GroupedData) après cela, diverses fonctions d'agrégation peuvent être réalisées. Des exemples typiques sont «agg» et «count».

Agréger par ** groupBy ** → ** count **

Exécute groupBy avec campaignID comme clé et compte le nombre d'enregistrements aveccount (). Si vous énumérez plusieurs clés dans groupBy, la combinaison sera utilisée comme clé pour groupBy.

print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+

print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292|  633|
#|Campaign086|107624|  623|
#|Campaign047|121150|  517|
#|Campaign086| 22975|  491|
#|Campaign122| 90714|  431|
#+-----------+------+-----+
Agréger par ** groupBy ** → ** agg **

Vous pouvez exécuter GroupBy avec ʻuserIDcomme clé et calculer la moyenne et le maximum / minimum des résultats agrégés. Renvoie le résultat de l'exécution de la fonctionvalue ( min, sum, ʻave etc) sur la colonne key avec ʻagg ({key: value}). Puisque la valeur de retour est un Dataframe, il est possible de réduire davantage les lignes avec .filter ()`.

print whole_log_df.groupBy("userID").agg({"access_time": "min"}).show(3)
#+------+--------------------+
#|userID|    min(access_time)|
#+------+--------------------+
#|  4831|2015-04-27 22:49:...|
#| 48631|2015-04-27 22:15:...|
#|143031|2015-04-27 21:52:...|
#+------+--------------------+

print whole_log_df.groupBy("userID").agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()
#20480
Conversion verticale / horizontale avec ** groupBy ** → ** pivot **

Pivot est une nouvelle fonctionnalité de Spark v1.6 Fournit des fonctionnalités similaires à SQL Pivot -spark.html). Dans le cas de Pivot of Sample code, les modifications verticales et horizontales comme suit.

Vous devez toujours appeler groupBy ("colonnes qui restent verticales"). Pivot ("colonnes que vous voulez convertir de portrait en paysage"). Sum ("colonnes de valeurs agrégées") et trois méthodes de la chaîne.

agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)

#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107|    4|
#|103339|Campaign027|    1|
#|169114|Campaign112|    1|
#+------+-----------+-----+

#Une cellule sans valeur sera nulle
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()

#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)

#Lorsque vous souhaitez remplir une cellule sans valeur avec 0
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)

Ajouter des colonnes avec UDF

UDF peut être utilisé dans Spark Dataframe, je pense que l'utilisation principale est d'ajouter des colonnes. Étant donné que le Dataframe est fondamentalement immuable, vous ne pouvez pas modifier le contenu de la colonne et vous allez créer un autre Dataframe avec la colonne ajoutée.

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType

def add_day_column(access_time):
    return int(access_time.strftime("%Y%m%d"))
    
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)

#+--------------------+------+-----------+----------+
#|         access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077|  20150427|
#|2015-04-27 00:27:...| 24485|Campaign063|  20150427|
#|2015-04-27 00:28:...| 24485|Campaign063|  20150427|
#|2015-04-27 00:33:...| 24485|Campaign038|  20150427|
#|2015-04-27 01:00:...| 24485|Campaign063|  20150427|
#+--------------------+------+-----------+----------+

La notation UDF peut également être écrite à l'aide de la fonction lambda.

my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)

#+--------------------+------+-----------+--------+
#|         access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077|  144017|
#|2015-04-27 00:27:...| 24485|Campaign063|   24490|
#|2015-04-27 00:28:...| 24485|Campaign063|   24490|
#|2015-04-27 00:33:...| 24485|Campaign038|   24490|
#|2015-04-27 01:00:...| 24485|Campaign063|   24490|
#+--------------------+------+-----------+--------+

Inversement, utilisez df.drop () pour créer un Dataframe qui souhaite supprimer une colonne particulière.

print whole_log_df.drop("userID").show(3)

#+--------------------+-----------+
#|         access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+

Rejoignez deux Dataframes avec Join

Il est également possible de joindre deux Dataframes. Ici, considérons le cas où seul le journal d'un utilisateur lourd (utilisateur avec 100 accès ou plus) est extrait du journal entier.

Premièrement, l'ID utilisateur d'un utilisateur qui a 100 accès ou plus et le nombre d'accès sont agrégés par `.groupBy (" userID "). Count ()" et réduit à 100 ou plus par "filtre".

heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)

print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()

#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231|  134|
#| 13431|  128|
#|144432|  113|
#+------+-----+
#
#177

Si vous appelez la méthode join dans le Dataframe d'origine (qui sera à gauche) et écrivez la condition de jointure avec le partenaire de jointure (qui sera à droite), vous pouvez rejoindre le Dataframe comme une jointure SQL.

Le format de jointure doit être ʻinner, ʻouter, left_outer, rignt_outer, etc., mais autre que ʻinner ne fonctionne pas comme prévu (et est traité comme ʻouter). Pour le moment, j'essaye de joindre à l'extérieur avec ʻinner, puis de supprimer les colonnes inutiles avec drop`. Veuillez consulter la page officielle pour les options détaillées, etc..

Par le processus de jointure suivant, nous avons pu récupérer le journal de 38 729 lignes correspondant aux utilisateurs (177 personnes) qui ont 100 accès ou plus (le journal total est d'environ 320 000 lignes).

joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()

#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)

#None
#+--------------------+-----------+------+
#|         access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729

Extraire des colonnes de Dataframe

print whole_log_df.columns
#['access_time', 'userID', 'campaignID']

print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]

print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]

Retour de Dataframe vers RDD / List

Il existe deux manières principales de convertir un Dataframe en RDD.

#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]

# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]

#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]

#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]

Exportation de Dataframe vers un fichier Parquet

Si vous exportez le Dataframe vers un fichier au format Parquet, vous pouvez exporter vers un fichier tout en conservant les informations de schéma. Si le répertoire du compartiment S3 à exporter existe déjà, l'écriture échouera. Spécifiez un nom de répertoire qui n'existe pas encore.

#write to parquet filed
whole_log_df.select("access_time", "userID").write.parquet("s3n://my_S3_bucket/parquet_export") 

#reload from parquet filed
reload_df = sqlContext.read.parquet("s3n://my_S3_bucket/parquet_export") 
print reload_df.printSchema()

Recommended Posts

Exemple de collecte de code Spark Dataframe
[Remarque] Exemple de code CADquery
[Python] Exemple de code pour la grammaire Python
Résumé de l'exemple de code de traitement parallèle / parallèle Python
Paiza Code Girl Collection #Gull this
Extraire les informations du code postal à l'aide de Spark