Ab Spark Ver 1.3 wurde eine Funktion namens Spark Dataframe hinzugefügt. Die Funktionen sind wie folgt.
filter
und select
können Sie Zeilen und Spalten extrahieren, die die Bedingungen erfüllen.groupBy → agg
durchgeführt werdenMit anderen Worten, es ist einfacher Code und eine schnellere Verarbeitung als das Schreiben in RDD "Map" oder "Filter". Unter der Annahme, dass die Datenvorverarbeitung von RDD durchgeführt wird, ist es besser, sie sofort mit maji in Dataframe einzulesen. Da die Memos von Dataframe verstreut sind, werde ich den Beispielcode sichern, wenn ich ein Memorandum habe.
Darüber hinaus sollte beachtet werden
Verwenden Sie das Zugriffsprotokoll als Betreff. Zugriffsprotokoll (CSV), das in Buch der Technical Review Company zur CSV-Datei verwendet wird Klicken Sie hier für Nao Rin. Der Inhalt von csv ist das folgende Protokoll mit 3 Datumsinformationen, Benutzer-ID, Kampagnen-ID
click.at user.id campaign.id
2015/4/27 20:40 144012 Campaign077
2015/4/27 0:27 24485 Campaign063
2015/4/27 0:28 24485 Campaign063
2015/4/27 0:33 24485 Campaign038
Lesen Sie csv und machen Sie es RDD. Löschen Sie die Kopfzeile in der ersten Zeile und lesen Sie die erste Spalte als Datum / Uhrzeit-Objekt.
import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
if not os.path.exists("./click_data_sample.csv"):
print "csv file not found at master node, will download and copy to HDFS"
commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")
whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
.map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])
whole_log.take(3)
#[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
# [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
# [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]
Datenrahmen können mit sqlContext.createDataFrame (my_rdd, my_schema)
erstellt werden, indem der Spaltenname und jeder Typ (TimestampType
, IntegerType
, StringType
usw.) angegeben werden, wenn eine ursprüngliche RDD vorhanden ist. Ich werde. Siehe hier für die Definition des Schemas.
printSchema ()
, dtypes
ist die Schemainformation,count ()
ist die Anzahl der Zeilen und show (n)
ist die ersten n Datensätze.
fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)
whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)
#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
Verwenden Sie spark-csv
, eines der Spark-Pakete, um die von csv gelesenen Daten in einen Datenrahmen zu konvertieren. Es ist einfacher, Databricks / Spark-CSV zu verwenden. Sofern nicht anders angegeben, wird alles als Zeichenfolge gelesen. Wenn Sie jedoch "inferSchema" angeben, ist dies eine gute Analogie.
whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)
#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#| click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55| 24485|Campaign063|
#|2015-04-27 00:28:13| 24485|Campaign063|
#|2015-04-27 00:33:42| 24485|Campaign038|
#|2015-04-27 01:00:04| 24485|Campaign063|
whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_3.printSchema()
#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)
Übrigens ist es mühsam, "." Im Spaltennamen zu haben, sodass Sie es mit "withColumnRenamed" umbenennen können (Sie können einen anderen umbenannten Datenrahmen erstellen).
whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
.withColumnRenamed("user.id", "userID")\
.withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
Verwenden Sie sqlContext.read.json
, um die aus der json-Datei gelesenen Daten so wie sie sind in einen Datenrahmen zu konvertieren. Behandeln Sie jede Dateizeile als 1 json-Objekt. Wenn ein Schlüssel nicht vorhanden ist, wird "null" eingegeben.
# test_json.json contains following 3 lines, last line doesn't have "campaignID" key
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}
df_json = sqlContext.read.json("/user/hadoop/test_json.json")
df_json.printSchema()
df_json.show(5)
#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#| access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55| null| 24485|
#+-------------------+-----------+------+
Verwenden Sie sqlContext.read.parquet
, um die aus der Parkettdatei gelesenen Daten so wie sie sind in einen Datenrahmen zu konvertieren. Wenn Sie den Ordner angeben, in dem sich die Parkettdatei befindet, werden die Parkettdateien unter diesem Ordner in einem Stapel gelesen.
sqlContext.read.parquet("/user/hadoop/parquet_folder/")
Dies ist ein Beispiel, das den Datenrahmen mit SQL-Anweisungen abfragt. Wenn Sie Dataframe mit registerTempTable
einen SQL-Tabellennamen geben, können Sie ihn als SQL-Tabellennamen bezeichnen. Der Rückgabewert von "sqlContext.sql (SQL-Anweisung)" ist ebenfalls ein Datenrahmen.
Es ist möglich, Unterabfragen zu beschreiben. Beachten Sie jedoch, dass aus irgendeinem Grund ein Syntaxfehler auftritt, wenn Sie der Seite "Unterabfragen" keinen Alias hinzufügen.
#Einfache SQL-Abfrage
whole_log_df.registerTempTable("whole_log_table")
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+
#Beim Einfügen von Variablen in SQL-Anweisungen
for count in range(1, 3):
print "Campaign00" + str(count)
print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()
#Campaign001
#+----------+
#|access_num|
#+----------+
#| 2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#| 1674|
#+----------+
#Für Unterabfrage:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#| 20480|
#+------------+
Dies ist eine einfache Suchfunktion für Dataframe. Die obige SQL-Anweisung ähnelt in ihrer Funktion der Abfrage, aber Filter und Auswahl werden als einfache Suchfunktionen positioniert. filter
extrahiert die Zeilen, die die Bedingungen erfüllen, und select
extrahiert die Spalten. Beachten Sie, dass sich die Grammatik geringfügig vom RDD-Filter unterscheidet.
#Sample for filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+
#Sample for select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#| access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+
groupBy
bietet ähnliche Funktionen wie RDDs reductByKey
, aber groupBy
ist die Methode hier. Durch anschließenden Aufruf von .sql.html # pyspark.sql.GroupedData) können verschiedene Aggregationsfunktionen realisiert werden. Typisch sind "agg" und "count".
Führt groupBy
mit categoryID
als Schlüssel aus und zählt die Anzahl der Datensätze mit count ()
. Wenn Sie in groupBy
mehrere Schlüssel auflisten, wird die Kombination als Schlüssel für groupBy verwendet.
print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+
print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292| 633|
#|Campaign086|107624| 623|
#|Campaign047|121150| 517|
#|Campaign086| 22975| 491|
#|Campaign122| 90714| 431|
#+-----------+------+-----+
Sie können GroupBy mit userID
als Schlüssel ausführen und den Durchschnitt und das Maximum / Minimum der aggregierten Ergebnisse berechnen. Gibt das Ergebnis der Ausführung der Funktion "Wert" ("min", "sum", "ave" usw.) in der Spalte "key" mit "agg ({key: value})" zurück. Da der Rückgabewert ein Datenrahmen ist, ist es möglich, die Zeilen mit .filter ()
weiter einzugrenzen.
print whole_log_df.groupBy("userID").agg({"access_time": "min"}).show(3)
#+------+--------------------+
#|userID| min(access_time)|
#+------+--------------------+
#| 4831|2015-04-27 22:49:...|
#| 48631|2015-04-27 22:15:...|
#|143031|2015-04-27 21:52:...|
#+------+--------------------+
print whole_log_df.groupBy("userID").agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()
#20480
Pivot ist eine neue Funktion aus Spark v1.6 Bietet ähnliche Funktionen wie SQL Pivot. -spark.html). Im Fall von Pivot of Sample-Code ändern sich die vertikale und horizontale Position wie folgt.
agged_df
)pivot_df
)Sie müssen immer groupBy ("Spalten, die vertikal bleiben") aufrufen. Pivot ("Spalten, die Sie von vertikal in horizontal konvertieren möchten"). Sum ("Spalten mit aggregierten Werten") und die drei Methoden in der Kette.
agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)
#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107| 4|
#|103339|Campaign027| 1|
#|169114|Campaign112| 1|
#+------+-----------+-----+
#Eine Zelle ohne Wert ist null
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()
#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)
#Wenn Sie eine Zelle ohne Wert mit 0 füllen möchten
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)
UDF kann in Spark Dataframe verwendet werden. Ich denke, die Hauptverwendung besteht darin, Spalten hinzuzufügen. Da der Datenrahmen grundsätzlich unveränderlich ist, können Sie den Inhalt der Spalte nicht ändern und erstellen einen weiteren Datenrahmen mit der hinzugefügten Spalte.
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
def add_day_column(access_time):
return int(access_time.strftime("%Y%m%d"))
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)
#+--------------------+------+-----------+----------+
#| access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077| 20150427|
#|2015-04-27 00:27:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:28:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:33:...| 24485|Campaign038| 20150427|
#|2015-04-27 01:00:...| 24485|Campaign063| 20150427|
#+--------------------+------+-----------+----------+
Die UDF-Notation kann auch mit der Lambda-Funktion geschrieben werden.
my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)
#+--------------------+------+-----------+--------+
#| access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077| 144017|
#|2015-04-27 00:27:...| 24485|Campaign063| 24490|
#|2015-04-27 00:28:...| 24485|Campaign063| 24490|
#|2015-04-27 00:33:...| 24485|Campaign038| 24490|
#|2015-04-27 01:00:...| 24485|Campaign063| 24490|
#+--------------------+------+-----------+--------+
Verwenden Sie umgekehrt "df.drop ()", um einen Datenrahmen zu erstellen, der eine bestimmte Spalte entfernen möchte.
print whole_log_df.drop("userID").show(3)
#+--------------------+-----------+
#| access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+
Es ist auch möglich, zwei Datenrahmen zu verbinden. Betrachten Sie hier den Fall, in dem nur das Protokoll eines schweren Benutzers (Benutzer mit 100 oder mehr Zugriffen) aus dem gesamten Protokoll extrahiert wird.
Zunächst werden die Benutzer-ID eines Benutzers mit 100 oder mehr Zugriffen und die Anzahl der Zugriffe nach ".groupBy (" userID "). Count ()" zusammengefasst und nach "filter" auf 100 oder mehr eingegrenzt.
heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)
print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()
#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231| 134|
#| 13431| 128|
#|144432| 113|
#+------+-----+
#
#177
Wenn Sie die Join-Methode im ursprünglichen Dataframe (links) aufrufen und die Join-Bedingung mit dem Join-Partner (rechts) schreiben, können Sie den Dataframe wie einen SQL-Join verbinden.
Das Join-Format sollte "inner", "Outer", "left_outer", "rignt_outer" usw. sein, aber anders als "inner" funktioniert es nicht wie beabsichtigt (und wird als "Outer" verarbeitet). Zur Zeit versuche ich, eine äußere Verknüpfung mit "inner" herzustellen und dann unnötige Spalten mit "drop" zu löschen. Weitere Informationen usw. finden Sie auf der offiziellen Seite usw..
Durch den folgenden Verknüpfungsprozess konnten wir das Protokoll von 38.729 Zeilen abrufen, die Benutzern (177 Personen) mit 100 oder mehr Zugriffen entsprechen (das Gesamtprotokoll umfasst ca. 320.000 Zeilen).
joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()
#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)
#None
#+--------------------+-----------+------+
#| access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729
.distinct ()
am Ende des Datenrahmens hinzu.print whole_log_df.columns
#['access_time', 'userID', 'campaignID']
print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]
print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]
Es gibt zwei Möglichkeiten, einen Datenrahmen wieder in RDD zu konvertieren.
.map
auf.rdd
an#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]
#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]
Wenn Sie den Datenrahmen in eine Datei im Parkettformat exportieren, können Sie ihn unter Beibehaltung der Schemainformationen in eine Datei exportieren. Wenn das Verzeichnis des zu exportierenden S3-Buckets bereits vorhanden ist, schlägt das Schreiben fehl. Geben Sie einen Verzeichnisnamen an, der noch nicht vorhanden ist.
#write to parquet filed
whole_log_df.select("access_time", "userID").write.parquet("s3n://my_S3_bucket/parquet_export")
#reload from parquet filed
reload_df = sqlContext.read.parquet("s3n://my_S3_bucket/parquet_export")
print reload_df.printSchema()
Recommended Posts