[PYTHON] Beispielcode-Sammlung für Spark-Datenrahmen

Einführung: Was ist Spark Dataframe?

Ab Spark Ver 1.3 wurde eine Funktion namens Spark Dataframe hinzugefügt. Die Funktionen sind wie folgt.

Mit anderen Worten, es ist einfacher Code und eine schnellere Verarbeitung als das Schreiben in RDD "Map" oder "Filter". Unter der Annahme, dass die Datenvorverarbeitung von RDD durchgeführt wird, ist es besser, sie sofort mit maji in Dataframe einzulesen. Da die Memos von Dataframe verstreut sind, werde ich den Beispielcode sichern, wenn ich ein Memorandum habe.

Darüber hinaus sollte beachtet werden

Beispielprotokoll laden

Verwenden Sie das Zugriffsprotokoll als Betreff. Zugriffsprotokoll (CSV), das in Buch der Technical Review Company zur CSV-Datei verwendet wird Klicken Sie hier für Nao Rin. Der Inhalt von csv ist das folgende Protokoll mit 3 Datumsinformationen, Benutzer-ID, Kampagnen-ID

click.at	user.id	campaign.id
2015/4/27 20:40	144012	Campaign077
2015/4/27 0:27	24485	Campaign063
2015/4/27 0:28	24485	Campaign063
2015/4/27 0:33	24485	Campaign038

Lesen Sie csv und machen Sie es RDD. Löschen Sie die Kopfzeile in der ersten Zeile und lesen Sie die erste Spalte als Datum / Uhrzeit-Objekt.

import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *

if not os.path.exists("./click_data_sample.csv"):
    print "csv file not found at master node, will download and copy to HDFS"
    commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
    commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")

whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
            .map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])

whole_log.take(3)
#[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
# [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
# [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]

So erstellen Sie einen Datenrahmen

** Erstellt aus RDD **

Datenrahmen können mit sqlContext.createDataFrame (my_rdd, my_schema) erstellt werden, indem der Spaltenname und jeder Typ (TimestampType, IntegerType, StringType usw.) angegeben werden, wenn eine ursprüngliche RDD vorhanden ist. Ich werde. Siehe hier für die Definition des Schemas.

printSchema (), dtypes ist die Schemainformation,count ()ist die Anzahl der Zeilen und show (n) ist die ersten n Datensätze.

fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)

whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)

#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
** Erstellt direkt aus der CSV-Datei **

Verwenden Sie spark-csv, eines der Spark-Pakete, um die von csv gelesenen Daten in einen Datenrahmen zu konvertieren. Es ist einfacher, Databricks / Spark-CSV zu verwenden. Sofern nicht anders angegeben, wird alles als Zeichenfolge gelesen. Wenn Sie jedoch "inferSchema" angeben, ist dies eine gute Analogie.

whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)

#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#|           click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55|  24485|Campaign063|
#|2015-04-27 00:28:13|  24485|Campaign063|
#|2015-04-27 00:33:42|  24485|Campaign038|
#|2015-04-27 01:00:04|  24485|Campaign063|

whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_3.printSchema()

#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)

Übrigens ist es mühsam, "." Im Spaltennamen zu haben, sodass Sie es mit "withColumnRenamed" umbenennen können (Sie können einen anderen umbenannten Datenrahmen erstellen).

whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
                 .withColumnRenamed("user.id", "userID")\
                 .withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()

#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
** Erstellt direkt von json **

Verwenden Sie sqlContext.read.json, um die aus der json-Datei gelesenen Daten so wie sie sind in einen Datenrahmen zu konvertieren. Behandeln Sie jede Dateizeile als 1 json-Objekt. Wenn ein Schlüssel nicht vorhanden ist, wird "null" eingegeben.

# test_json.json contains following 3 lines, last line doesn't have "campaignID" key
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}

df_json = sqlContext.read.json("/user/hadoop/test_json.json")
df_json.printSchema()
df_json.show(5)

#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#|        access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55|       null| 24485|
#+-------------------+-----------+------+
** Erstellt direkt aus Parkett **

Verwenden Sie sqlContext.read.parquet, um die aus der Parkettdatei gelesenen Daten so wie sie sind in einen Datenrahmen zu konvertieren. Wenn Sie den Ordner angeben, in dem sich die Parkettdatei befindet, werden die Parkettdateien unter diesem Ordner in einem Stapel gelesen.

sqlContext.read.parquet("/user/hadoop/parquet_folder/")

Abfrage mit SQL-Anweisung

Dies ist ein Beispiel, das den Datenrahmen mit SQL-Anweisungen abfragt. Wenn Sie Dataframe mit registerTempTable einen SQL-Tabellennamen geben, können Sie ihn als SQL-Tabellennamen bezeichnen. Der Rückgabewert von "sqlContext.sql (SQL-Anweisung)" ist ebenfalls ein Datenrahmen.

Es ist möglich, Unterabfragen zu beschreiben. Beachten Sie jedoch, dass aus irgendeinem Grund ein Syntaxfehler auftritt, wenn Sie der Seite "Unterabfragen" keinen Alias hinzufügen.

#Einfache SQL-Abfrage

whole_log_df.registerTempTable("whole_log_table")

print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+


#Beim Einfügen von Variablen in SQL-Anweisungen
for count in range(1, 3):
    print "Campaign00" + str(count)
    print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()

#Campaign001
#+----------+
#|access_num|
#+----------+
#|      2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#|      1674|
#+----------+

#Für Unterabfrage:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#|       20480|
#+------------+

Bedingte Suche mit ** Filter **, ** Auswahl **

Dies ist eine einfache Suchfunktion für Dataframe. Die obige SQL-Anweisung ähnelt in ihrer Funktion der Abfrage, aber Filter und Auswahl werden als einfache Suchfunktionen positioniert. filter extrahiert die Zeilen, die die Bedingungen erfüllen, und select extrahiert die Spalten. Beachten Sie, dass sich die Grammatik geringfügig vom RDD-Filter unterscheidet.

#Sample for filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#|         access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+

#Sample for select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#|         access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+

Aggregieren nach ** groupBy **

groupBy bietet ähnliche Funktionen wie RDDs reductByKey, aber groupBy ist die Methode hier. Durch anschließenden Aufruf von .sql.html # pyspark.sql.GroupedData) können verschiedene Aggregationsfunktionen realisiert werden. Typisch sind "agg" und "count".

Aggregieren nach ** groupBy ** → ** count **

Führt groupBy mit categoryID als Schlüssel aus und zählt die Anzahl der Datensätze mit count (). Wenn Sie in groupBy mehrere Schlüssel auflisten, wird die Kombination als Schlüssel für groupBy verwendet.

print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+

print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292|  633|
#|Campaign086|107624|  623|
#|Campaign047|121150|  517|
#|Campaign086| 22975|  491|
#|Campaign122| 90714|  431|
#+-----------+------+-----+
Aggregieren nach ** groupBy ** → ** agg **

Sie können GroupBy mit userID als Schlüssel ausführen und den Durchschnitt und das Maximum / Minimum der aggregierten Ergebnisse berechnen. Gibt das Ergebnis der Ausführung der Funktion "Wert" ("min", "sum", "ave" usw.) in der Spalte "key" mit "agg ({key: value})" zurück. Da der Rückgabewert ein Datenrahmen ist, ist es möglich, die Zeilen mit .filter () weiter einzugrenzen.

print whole_log_df.groupBy("userID").agg({"access_time": "min"}).show(3)
#+------+--------------------+
#|userID|    min(access_time)|
#+------+--------------------+
#|  4831|2015-04-27 22:49:...|
#| 48631|2015-04-27 22:15:...|
#|143031|2015-04-27 21:52:...|
#+------+--------------------+

print whole_log_df.groupBy("userID").agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()
#20480
Vertikale / horizontale Konvertierung mit ** groupBy ** → ** Pivot **

Pivot ist eine neue Funktion aus Spark v1.6 Bietet ähnliche Funktionen wie SQL Pivot. -spark.html). Im Fall von Pivot of Sample-Code ändern sich die vertikale und horizontale Position wie folgt.

Sie müssen immer groupBy ("Spalten, die vertikal bleiben") aufrufen. Pivot ("Spalten, die Sie von vertikal in horizontal konvertieren möchten"). Sum ("Spalten mit aggregierten Werten") und die drei Methoden in der Kette.

agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)

#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107|    4|
#|103339|Campaign027|    1|
#|169114|Campaign112|    1|
#+------+-----------+-----+

#Eine Zelle ohne Wert ist null
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()

#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)

#Wenn Sie eine Zelle ohne Wert mit 0 füllen möchten
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)

Fügen Sie Spalten mit UDF hinzu

UDF kann in Spark Dataframe verwendet werden. Ich denke, die Hauptverwendung besteht darin, Spalten hinzuzufügen. Da der Datenrahmen grundsätzlich unveränderlich ist, können Sie den Inhalt der Spalte nicht ändern und erstellen einen weiteren Datenrahmen mit der hinzugefügten Spalte.

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType

def add_day_column(access_time):
    return int(access_time.strftime("%Y%m%d"))
    
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)

#+--------------------+------+-----------+----------+
#|         access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077|  20150427|
#|2015-04-27 00:27:...| 24485|Campaign063|  20150427|
#|2015-04-27 00:28:...| 24485|Campaign063|  20150427|
#|2015-04-27 00:33:...| 24485|Campaign038|  20150427|
#|2015-04-27 01:00:...| 24485|Campaign063|  20150427|
#+--------------------+------+-----------+----------+

Die UDF-Notation kann auch mit der Lambda-Funktion geschrieben werden.

my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)

#+--------------------+------+-----------+--------+
#|         access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077|  144017|
#|2015-04-27 00:27:...| 24485|Campaign063|   24490|
#|2015-04-27 00:28:...| 24485|Campaign063|   24490|
#|2015-04-27 00:33:...| 24485|Campaign038|   24490|
#|2015-04-27 01:00:...| 24485|Campaign063|   24490|
#+--------------------+------+-----------+--------+

Verwenden Sie umgekehrt "df.drop ()", um einen Datenrahmen zu erstellen, der eine bestimmte Spalte entfernen möchte.

print whole_log_df.drop("userID").show(3)

#+--------------------+-----------+
#|         access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+

Verbinden Sie zwei Datenrahmen mit Join

Es ist auch möglich, zwei Datenrahmen zu verbinden. Betrachten Sie hier den Fall, in dem nur das Protokoll eines schweren Benutzers (Benutzer mit 100 oder mehr Zugriffen) aus dem gesamten Protokoll extrahiert wird.

Zunächst werden die Benutzer-ID eines Benutzers mit 100 oder mehr Zugriffen und die Anzahl der Zugriffe nach ".groupBy (" userID "). Count ()" zusammengefasst und nach "filter" auf 100 oder mehr eingegrenzt.

heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)

print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()

#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231|  134|
#| 13431|  128|
#|144432|  113|
#+------+-----+
#
#177

Wenn Sie die Join-Methode im ursprünglichen Dataframe (links) aufrufen und die Join-Bedingung mit dem Join-Partner (rechts) schreiben, können Sie den Dataframe wie einen SQL-Join verbinden.

Das Join-Format sollte "inner", "Outer", "left_outer", "rignt_outer" usw. sein, aber anders als "inner" funktioniert es nicht wie beabsichtigt (und wird als "Outer" verarbeitet). Zur Zeit versuche ich, eine äußere Verknüpfung mit "inner" herzustellen und dann unnötige Spalten mit "drop" zu löschen. Weitere Informationen usw. finden Sie auf der offiziellen Seite usw..

Durch den folgenden Verknüpfungsprozess konnten wir das Protokoll von 38.729 Zeilen abrufen, die Benutzern (177 Personen) mit 100 oder mehr Zugriffen entsprechen (das Gesamtprotokoll umfasst ca. 320.000 Zeilen).

joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()

#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)

#None
#+--------------------+-----------+------+
#|         access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729

Spalten aus Dataframe extrahieren

print whole_log_df.columns
#['access_time', 'userID', 'campaignID']

print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]

print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]

Rückkehr vom Datenrahmen zur RDD / Liste

Es gibt zwei Möglichkeiten, einen Datenrahmen wieder in RDD zu konvertieren.

#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]

# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]

#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]

#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]

Export von Dataframe in Parkettdatei

Wenn Sie den Datenrahmen in eine Datei im Parkettformat exportieren, können Sie ihn unter Beibehaltung der Schemainformationen in eine Datei exportieren. Wenn das Verzeichnis des zu exportierenden S3-Buckets bereits vorhanden ist, schlägt das Schreiben fehl. Geben Sie einen Verzeichnisnamen an, der noch nicht vorhanden ist.

#write to parquet filed
whole_log_df.select("access_time", "userID").write.parquet("s3n://my_S3_bucket/parquet_export") 

#reload from parquet filed
reload_df = sqlContext.read.parquet("s3n://my_S3_bucket/parquet_export") 
print reload_df.printSchema()

Recommended Posts

Beispielcode-Sammlung für Spark-Datenrahmen
[Hinweis] CADquery-Beispielcode
[Python] Beispielcode für die Python-Grammatik
Beispielcode-Zusammenfassung für die parallele / parallele Python-Verarbeitung
paiza Code Girl Collection #Gull this
Postleitzahlinformationen mit Funken extrahieren