[PYTHON] PySpark-Datenmanipulation

Dieser Artikel fasst die Funktionen und Datenoperationen von PySpark zusammen.

Über PySpark

Funktionen von PySpark (Spark)

Partitionierung und Bucketing

Ein wichtiges "Apache Hive" -Konzept für den Betrieb von PySpark

Weitere Informationen zum Partitionieren und Bucketing finden Sie unter hier.

Überprüfen des Nutzungsstatus von Computerressourcen

Wenn Ihre Daten langsam sind, ist es eine gute Idee, Ganglia zu verwenden, um zu sehen, wie Ihre Rechenressourcen verwendet werden.

Insbesondere ist das Netzwerkkommunikationsvolumen (= Datenübertragungsvolumen) gering und die Verarbeitung nimmt häufig Zeit in Anspruch. In diesem Fall können Sie das Problem möglicherweise lösen, indem Sie die folgenden Maßnahmen ergreifen.

PySpark-Code-Snippet

Es wird angenommen, dass die folgenden Variablen generiert wurden.

Hinweis

  1. Der von > df.show () angezeigte Inhalt ist möglicherweise nicht immer korrekt, um das Bild zu erfassen.
  2. Möglicherweise verwenden Sie plötzlich eine andere Variable als die oben aufgeführten.
  3. Der Pfad beginnt bei s3: //, da er unter AWS ausgeführt werden soll.
  4. Wir planen, nacheinander hinzuzufügen und zu ändern.
  5. ** Wenn Sie einen Fehler in der Syntax oder beim Schreiben machen, hinterlassen Sie bitte einen Kommentar. ** ** **

import

Die folgenden Elemente werden hauptsächlich bei Verwendung von "Funken" importiert.

# from pyspark.sql.functions import *In manchen Fällen,
#Ich möchte den Namespace der Funktion mit F angeben, weil es einfacher zu verstehen ist.
#F verstößt jedoch gegen PEP8. .. ..
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window

Einstellungen für die Ausführungsumgebung

spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")

initialize spark Nicht erforderlich für "JupyterHub" von "EMR", aber bei Ausführung mit Python-Skript Die Instanzinitialisierung von "Funken" ist erforderlich.

# spark initialization
spark = SparkSession.builder.appName("{your app name here}").getOrCreate()

Daten lesen

df = spark.read.parquet(path)
# dt=2020-01-01/Lesen Sie alle Dateien unten
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-01/*.parquet")

# dt=2020-01-01/Von dt=2020-01-31/Lesen Sie alle Dateien unten
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-*/*.parquet")
#Liest Dateien in Pfaden, die in der Pfadliste enthalten sind
df = spark.read.parquet(*paths)
#Partition zur Spalte hinzufügen und lesen
df = spark.read.option("basePath", parent_path).parquet(*paths)

Durch Speichern des Ergebnisses der Verzögerungsauswertung im Speicher wird eine Hochgeschwindigkeitsverarbeitung möglich. Es ist besser, häufig verwendete Daten zwischenzuspeichern () und sie insbesondere nach der Verarbeitung zu verwenden.

#On-Memory-Cache
df = df.cache()
#Oder
#On-Memory-Cache Standardmäßig kann das Cache-Ziel mit optionalem Argument in Speicher usw. geändert werden
df = df.persist()
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

Datenausgabe

#csv (Header wird in diesem Fall nicht angegeben)
df.write.csv(path)

# parquet
df.write.parquet(path)
#Im Fall von csv wird es nur angegeben, wenn die Ausgabeeinstellung des Headers festgelegt ist.
df.write.mode("overwrite").option("header", "True").csv(path)
# or
df.write.mode("overwrite").csv(path, header=True)
#Bei Parkett wird es standardmäßig ausgegeben, auch wenn kein Header angegeben ist.
df.write.parquet(path)
# gzip with csv
df.write.csv(path, compression="gzip")

#bissig mit Parkett (sollte standardmäßig bissig komprimiert sein?)
df.write.option("compression", "snappy").parquet(path)

Im folgenden Beispiel wird es in den Ordner "/ dt = {dt_col} / count = {count_col} / {file} .parquet" ausgegeben.

df.repartition("dt", "count").write.partitionBy("dt", "count").parqeut(path)

Wenn Sie nach mehreren Prozessen eine Koaleszenz durchführen, verlangsamt sich die Verarbeitungsgeschwindigkeit. Wenn möglich, ist es besser, die Datei normal auszugeben und den Lesevorgang dann erneut zu koaleszieren.

#Kann nach mehreren Prozessen langsam sein
df.coalesce(1).write.csv(path, header=True)

#Wenn möglich empfohlen (Ausgabe → Lesen → Ausgabe)
df.write.parquet(path)
alt_df = spark.read.parquet(path)
alt_df.coalesce(1).write.csv(path, header=True)
df.repartition(20).write.parquet(path)
# write.mode()Argumente, die in verwendet werden können'overwrite', 'append', 'ignore', 'error', 'errorifexists'
#Ich benutze oft Überschreiben
#Normalerweise tritt ein Fehler auf, wenn die Datei im Ausgabezielordner vorhanden ist.
df.write.parquet(path)

#Wenn Sie überschreiben möchten
df.write.mode("overwrite").parquet(path)

#Wenn Sie zum aktuellen Ordner hinzufügen möchten
df.write.mode("append").parquet(path)

Datenrahmengenerierung

Dies ist eine Methode zum programmgesteuerten Erstellen eines Datenrahmens, nicht zum Lesen einer Datei.

#Erstellen Sie einen einspaltigen Datenrahmen
id_list = ["A001", "A002", "B001"]
df = spark.createDataFrame(id_list, StringType()).toDF("id")
#Das Element im Inneren ist Tupel,Geben Sie abschließend den Namen der Spalte an
df = spark.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

> df.show()
+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

# =======================
#Beim einmaligen Erstellen mit rdd
rdd = sc.parallelize(
    [
        (0, "A", 223, "201603", "PORT"), 
        (0, "A", 22, "201602", "PORT"), 
        (0, "A", 422, "201601", "DOCK"), 
        (1, "B", 3213, "201602", "DOCK"), 
        (1, "B", 3213, "201601", "PORT"), 
        (2, "C", 2321, "201601", "DOCK")
    ]
)
df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

> df.show()
+---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

Spalte hinzufügen (withColumn ())

In PySpark wird die Analyse häufig mithilfe des "Prozesses zum Hinzufügen einer neuen Spalte" durchgeführt.

# new_col_Erstellen Sie eine neue Spalte mit dem Namen name und geben Sie ihr einen Literalwert (= Konstante) von 1.
df = df.withColumn("new_col_name", F.lit(1))
#Geben Sie den Pfad der gelesenen Datei an
df = df.withColumn("file_path", F.input_file_name())

#Rufen Sie den Dateinamen aus dem Pfad der gelesenen Datei ab
df = df.withColumn("file_name", F.split(col("file_path"), "/").getItem({int:Letzter Indexwert}))
#Wird durch die Zeichenfolge angegeben
df = df.withColumn("total_count", F.col("total_count").cast("double"))

#Spezifiziert durch PySpark-Typen
df = df.withColumn("value", F.lit("1").cast(StringType()))
#Wenn Sie eine Wertespalte entsprechend den Bedingungen hinzufügen möchten
# F.when(condtion, value).otherwise(else_value)
df = df.withColumn("is_even", F.when(F.col("number") % 2 == 0, 1).otherwise(0))

#Bei mehreren Bedingungen
df = df.withColumn("search_result", F.when( (F.col("id") % 2 == 0) & (F.col("room") % 2 == 0), 1).otherwise(0))

df = df.withColumn("is_touched", F.col("value").isNotNull())
df = df.withColumn("replaced_id", F.regexp_replace(F.col("id"), "A", "C"))
# date time -> epoch time
df = df.withColumn("epochtime", F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ssZ"))

# epoch time -> date time
# 1555259647 -> 2019-04-14 16:34:07
df = df.withColumn("datetime", F.to_timestamp(df["epochtime"]))

# datetime -> string
# 2019-04-14 16:34:07 -> 2019-04-14
string_format = "yyyy-MM-dd"
df = df.withColumn("dt", F.date_format(F.col("datetime"), string_format))

# epoch time:Eine Folge von Zahlen mit ungefähr 10 Ziffern. Anzahl der Sekunden seit dem 1. Januar 1970
df = df.withColumn("hour", F.hour(F.col("epochtime")))
df = df.withColumn("hour", F.hour(F.col("timestamp")))

#Kürzen Sie datetime auf die angegebene Zeitbreite
df = df.withColumn("hour", F.date_trunc("hour", "datetime"))
df = df.withColumn("week", F.date_trunc("week", "datetime"))

Es gibt viele andere Funktionen in Takutan withColumn. Bitte beachten Sie auch die Referenzseite.

Datenrahmen kombinieren

Die Methode zum horizontalen / vertikalen Verbinden von zwei DataFrames lautet "join () / union ()".

#Geben Sie Spalten an, mit denen verbunden werden soll
df = left_df.join(right_df, on="id")

# data-Für verschiedene Spalten für jeden Frame
df = left_df.join(right_df, left_df.id_1 == right_df.id_2)

#Sie können auch die Kombinationsmethode angeben
# how:= inner, left, right, left_semi, left_anti, cross, outer, full, left_outer, right_outer
df = left_df.join(right_df, on="id", how="inner")
df = left_df.join(right_df, on=["id", "dt"])
df = left_df.join(F.broadcast(right_df), on="id")
df = upper_df.union(bottom_df)

Spaltenoperation (umbenennen, löschen, auswählen)

Es wird häufig beim Lesen von CSV ohne Spaltennamen verwendet.

#Wenn es keinen Spaltennamen gibt`_c0`Von`_c{n}`Wird der Spaltenname gegeben
df = df.withColumnRenamed("_c0", "id")
df = df.select("id")
df = df.select("id").distinct()
# count()Wird oft in Kombination mit verwendet
#Beispiel: Eindeutige Nummer einer bestimmten ID
print(df.select("id").distinct().count())
df = df.drop("id")
# simple
df = df.dropna()
# using subset
df = df.na.drop(subset=["lat", "lon"])
#Einfacher Fall
df = df.select("id").select(F.collect_list("id"))
id_list = df.first()[0]

> id_list => ["A001", "A002", "B001"]

#Kann auch in Kombination mit groupBy verwendet werden
df = df.groupBy("id").agg(F.collect_set("code"), F.collect_list("name"))

> 
+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+
#Rufen Sie den Wert des Datenrahmens direkt ab
df = df.groupBy().avg()
avg_attribute = df.collect()[0]

> print(avg_attribute["avg({col_name})"])
{averaged_value}

filter

Sie können F.col () verwenden, um die Filterung auf bestimmte Spalten anzuwenden

# using spark.function
df = df.filter(F.col("id") == "A001")

# pandas-like
df = df.filter(df['id'] == "A001")
df = df.filter(df.id == "A001")

Wenn möglich, sollten Sie jedoch einen Spark-Datenrahmen aus date_list erstellen und diesem beitreten.

df = df.filter(F.col("dt").isin(date_list))

orderBy

Das Sortieren ist nicht für die verteilte Verarbeitung geeignet, daher ist es besser, nicht so viel zu tun.

#Nur eine Spalte
df = df.orderBy("count", ascending=False)

#Sortierung mit mehreren Bedingungen
df = df.orderBy(F.col("id").asc(), F.col("cound").desc())

groupBy (aggregate)

# count()
df = df.groupBy("id").count()

# multiple
# alias()Der Spaltenname wird von der Funktion geändert
#Beispiel: Benutzeraggregation
df = df.groupBy("id").agg(
  F.count(F.lit(1)).alias("count"),
  F.mean(F.col("diff")).alias("diff_mean"),
  F.stddev(F.col("diff")).alias("diff_stddev"),
  F.min(F.col("diff")).alias("diff_min"),
  F.max(F.col("diff")).alias("diff_max")
)

> df.show()
(Kürzung)

# =======================
#Beispiel: Aggregation nach Datum und Uhrzeit des Benutzers
df = df.groupBy("id", "dt").agg(
  F.count(F.lit(1)).alias("count")
  )
  
> df.show()
+---+-----------+------+
| id|         dt| count|
+---+-----------+------+
|  a| 2020/01/01|     7|
|  a| 2020/01/02|     5|
|  a| 2020/01/03|     4|
+---+-----------+------+

# ===========================
#Beispiel: Aggregation nach Benutzerdatum / -zeit / -ort
df = df.groupBy("id", "dt", "location_id").agg(
  F.count(F.lit(1)).alias("count")
  )

> df.show()
+---+-----------+------------+------+
| id|         dt| location_id| count|
+---+-----------+------------+------+
|  a| 2020/01/01|           A|     2|
|  a| 2020/01/01|           B|     3|
|  a| 2020/01/01|           C|     2|
:   :           :            :      :
+---+-----------+------------+------+
#Beispiel: Anzahl der Benutzer eindeutig nach Datum
df = df.groupBy("dt").agg(countDistinct("id").alias("id_count"))

> df.show()
+-----------+---------+
|         dt| id_count|
+-----------+---------+
| 2020/01/01|        7|
| 2020/01/02|        5|
| 2020/01/03|        4|
+-----------+---------+

# ===============================
#Beispiel: Anzahl der Tage, an denen jeder Benutzer mindestens einmal Kontakt hatte
df = df.groupBy("id").agg(countDistinct("dt").alias("dt_count"))

> df.show()
+---+---------+
| id| dt_count|
+---+---------+
|  a|       10|
|  b|       15|
|  c|        4|
+---+---------+
group_columns = ["id", "dt"]
df = ad_touched_visit_df.groupBy(*group_columns).count()

Fensterfunktion

w = Window().orderBy(F.col("id"))
df = df.withColumn("row_num", F.row_number().over(w))
#Fügen Sie die Daten aus der vorherigen Zeile als Spalte hinzu
w = Window.partitionBy("id").orderBy("timestamp")
df = df.withColumn("prev_timestamp", F.lag(df["timestamp"]).over(w))

Schleifenverarbeitung

Es ist stark veraltet, da es nicht mit verteilten Umgebungen kompatibel ist. Es ist besser, es nur zu verwenden, wenn es für sein muss.

for row in df.rdd.collect():
  do_some_with(row['id'])

Referenzseite

Recommended Posts

PySpark-Datenmanipulation
Datenmanipulation mit Pandas!
Bearbeiten von Daten in Python-try mit Pandas_plyr
[Übersetzung] scicit-learn 0.18 Tutorial Manipulation von Textdaten
Pandas Data Manipulation Column Join, Spaltenaustausch, Spaltenumbenennung
Datenverarbeitung
[Python] Kapitel 04-02 Verschiedene Datenstrukturen (Listenmanipulation)
[Python] Kapitel 04-07 Verschiedene Datenstrukturen (Wörterbuchmanipulation)
Lassen Sie uns MySQL-Daten mit Python bearbeiten