[PYTHON] Manipulation des données PySpark

Cet article résume les fonctionnalités et les opérations sur les données de PySpark.

À propos de PySpark

Caractéristiques de PySpark (Spark)

«Partitionnement» et «compartimentage»

Un concept important de ʻApache Hive` dans le fonctionnement de PySpark

Pour plus d'informations sur le partitionnement et le compartimentage, voir ici.

Vérification de l'état d'utilisation des ressources informatiques

Si vos données sont lentes, il est judicieux d'utiliser Ganglia pour voir comment vos ressources de calcul sont utilisées.

En particulier, le volume de communication réseau (= volume de transfert de données) est faible et le traitement prend souvent du temps. Dans ce cas, vous pourrez peut-être le résoudre en prenant les mesures suivantes.

Extrait de code PySpark

Les variables suivantes sont supposées avoir été générées.

Mise en garde

  1. Le contenu affiché par > df.show () peut ne pas toujours être correct dans le but de saisir l'image.
  2. Vous pouvez soudainement utiliser une variable autre que celles listées ci-dessus.
  3. Le chemin commence à «s3: //» car il est censé s'exécuter sur AWS.
  4. Nous prévoyons d'ajouter et de modifier séquentiellement.
  5. ** Si vous faites une erreur de syntaxe ou d'écriture, veuillez laisser un commentaire. ** **

import

Les éléments suivants sont principalement importés lors de l'utilisation de «spark».

# from pyspark.sql.functions import *Dans certains cas,
#J'aime spécifier l'espace de noms de la fonction avec F car c'est plus facile à comprendre.
#Cependant, F viole PEP8. .. ..
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType
from pyspark.sql.window import Window

Paramètres de l'environnement d'exécution

spark.conf.set("spark.sql.session.timeZone", "Asia/Tokyo")

initialize spark Ce n'est pas nécessaire sur JupyterHub de ʻEMR`, mais lors de l'exécution avec un script python, L'initialisation de l'instance de «spark» est requise.

# spark initialization
spark = SparkSession.builder.appName("{your app name here}").getOrCreate()

Lecture des données

df = spark.read.parquet(path)
# dt=2020-01-01/Lisez tous les fichiers ci-dessous
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-01/*.parquet")

# dt=2020-01-01/De dt=2020-01-31/Lisez tous les fichiers ci-dessous
df = spark.read.parquet("s3://some-bucket/data/dt=2020-01-*/*.parquet")
#lit les fichiers dans les chemins inclus dans la liste des chemins
df = spark.read.parquet(*paths)
#Ajouter une partition à la colonne et lire
df = spark.read.option("basePath", parent_path).parquet(*paths)

En sauvegardant le résultat de l'évaluation du retard dans la mémoire, un traitement à grande vitesse devient possible. Il est préférable de mettre en cache () les données fréquemment utilisées et de les utiliser surtout après le traitement.

#Cache en mémoire
df = df.cache()
#Ou
#Cache en mémoire par défaut, la destination du cache peut être changée en stockage, etc. avec un argument facultatif
df = df.persist()
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

Sortie de données

#csv (l'en-tête n'est pas donné dans ce cas)
df.write.csv(path)

# parquet
df.write.parquet(path)
#Dans le cas de csv, il n'est donné que si le paramètre de sortie de l'en-tête est défini.
df.write.mode("overwrite").option("header", "True").csv(path)
# or
df.write.mode("overwrite").csv(path, header=True)
#Dans le cas du parquet, il est édité par défaut même si l'en-tête n'est pas spécifié.
df.write.parquet(path)
# gzip with csv
df.write.csv(path, compression="gzip")

#accrocheur avec du parquet (devrait-il être compressé par défaut?)
df.write.option("compression", "snappy").parquet(path)

Dans le cas de l'exemple suivant, il sera affiché dans le dossier / dt = {dt_col} / count = {count_col} / {file} .parquet.

df.repartition("dt", "count").write.partitionBy("dt", "count").parqeut(path)

Si vous effectuez une fusion après plusieurs processus, la vitesse de traitement ralentira, donc si possible, il est préférable de sortir le fichier normalement, puis de fusionner à nouveau la lecture.

#Peut être lent après plusieurs processus
df.coalesce(1).write.csv(path, header=True)

#Recommandé si possible (sortie → lecture → sortie)
df.write.parquet(path)
alt_df = spark.read.parquet(path)
alt_df.coalesce(1).write.csv(path, header=True)
df.repartition(20).write.parquet(path)
# write.mode()Arguments pouvant être utilisés dans'overwrite', 'append', 'ignore', 'error', 'errorifexists'
#J'utilise souvent l'écrasement
#Normalement, une erreur se produit si le fichier existe dans le dossier de destination de sortie.
df.write.parquet(path)

#Si vous voulez écraser
df.write.mode("overwrite").parquet(path)

#Si vous souhaitez ajouter au dossier actuel
df.write.mode("append").parquet(path)

Génération de trames de données

Il s'agit d'une méthode de création d'un bloc de données par programme, et non à partir de la lecture d'un fichier.

#Créer un bloc de données à une seule colonne
id_list = ["A001", "A002", "B001"]
df = spark.createDataFrame(id_list, StringType()).toDF("id")
#L'élément à l'intérieur est tuple,Spécifiez enfin le nom de la colonne
df = spark.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

> df.show()
+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

# =======================
#Lors de la création à l'aide de rdd une fois
rdd = sc.parallelize(
    [
        (0, "A", 223, "201603", "PORT"), 
        (0, "A", 22, "201602", "PORT"), 
        (0, "A", 422, "201601", "DOCK"), 
        (1, "B", 3213, "201602", "DOCK"), 
        (1, "B", 3213, "201601", "PORT"), 
        (2, "C", 2321, "201601", "DOCK")
    ]
)
df_data = spark.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

> df.show()
+---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

Ajouter une colonne (withColumn ())

Dans PySpark, l'analyse est souvent effectuée en utilisant le "processus d'ajout d'une nouvelle colonne".

# new_col_Créez une nouvelle colonne appelée nom et donnez-lui une valeur littérale (= constante) de 1.
df = df.withColumn("new_col_name", F.lit(1))
#Donnez le chemin du fichier lu
df = df.withColumn("file_path", F.input_file_name())

#Obtenez le nom du fichier à partir du chemin du fichier lu
df = df.withColumn("file_name", F.split(col("file_path"), "/").getItem({int:Dernière valeur d'index}))
#Spécifié par chaîne de caractères
df = df.withColumn("total_count", F.col("total_count").cast("double"))

#Spécifié par les types PySpark
df = df.withColumn("value", F.lit("1").cast(StringType()))
#Si vous souhaitez ajouter une colonne de valeurs selon les conditions
# F.when(condtion, value).otherwise(else_value)
df = df.withColumn("is_even", F.when(F.col("number") % 2 == 0, 1).otherwise(0))

#En cas de conditions multiples
df = df.withColumn("search_result", F.when( (F.col("id") % 2 == 0) & (F.col("room") % 2 == 0), 1).otherwise(0))

df = df.withColumn("is_touched", F.col("value").isNotNull())
df = df.withColumn("replaced_id", F.regexp_replace(F.col("id"), "A", "C"))
# date time -> epoch time
df = df.withColumn("epochtime", F.unix_timestamp(F.col("timestamp"), "yyyy-MM-dd HH:mm:ssZ"))

# epoch time -> date time
# 1555259647 -> 2019-04-14 16:34:07
df = df.withColumn("datetime", F.to_timestamp(df["epochtime"]))

# datetime -> string
# 2019-04-14 16:34:07 -> 2019-04-14
string_format = "yyyy-MM-dd"
df = df.withColumn("dt", F.date_format(F.col("datetime"), string_format))

# epoch time:Une chaîne de nombres d'environ 10 chiffres. Nombre de secondes depuis le 1er janvier 1970
df = df.withColumn("hour", F.hour(F.col("epochtime")))
df = df.withColumn("hour", F.hour(F.col("timestamp")))

#Tronquer datetime à la largeur de temps spécifiée
df = df.withColumn("hour", F.date_trunc("hour", "datetime"))
df = df.withColumn("week", F.date_trunc("week", "datetime"))

Il existe de nombreuses autres fonctions disponibles dans Takutan withColumn. Veuillez également consulter le site de référence.

Combiner des trames de données

La méthode pour joindre deux DataFrames horizontalement / verticalement est join () / union ().

#Spécifiez les colonnes à rejoindre sur
df = left_df.join(right_df, on="id")

# data-Pour différentes colonnes pour chaque cadre
df = left_df.join(right_df, left_df.id_1 == right_df.id_2)

#Vous pouvez également spécifier la méthode de combinaison
# how:= inner, left, right, left_semi, left_anti, cross, outer, full, left_outer, right_outer
df = left_df.join(right_df, on="id", how="inner")
df = left_df.join(right_df, on=["id", "dt"])
df = left_df.join(F.broadcast(right_df), on="id")
df = upper_df.union(bottom_df)

Opération de colonne (renommer, supprimer, sélectionner)

Il est souvent utilisé lors de la lecture de csv sans nom de colonne.

#S'il n'y a pas de nom de colonne`_c0`De`_c{n}`Reçoit le nom de la colonne
df = df.withColumnRenamed("_c0", "id")
df = df.select("id")
df = df.select("id").distinct()
# count()Souvent utilisé en combinaison avec
#Exemple: numéro unique d'un certain identifiant
print(df.select("id").distinct().count())
df = df.drop("id")
# simple
df = df.dropna()
# using subset
df = df.na.drop(subset=["lat", "lon"])
#Cas simple
df = df.select("id").select(F.collect_list("id"))
id_list = df.first()[0]

> id_list => ["A001", "A002", "B001"]

#Peut également être utilisé en combinaison avec groupBy
df = df.groupBy("id").agg(F.collect_set("code"), F.collect_list("name"))

> 
+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+
#Obtenez directement la valeur de la trame de données
df = df.groupBy().avg()
avg_attribute = df.collect()[0]

> print(avg_attribute["avg({col_name})"])
{averaged_value}

filter

Vous pouvez utiliser F.col () pour appliquer le filtrage à des colonnes spécifiques

# using spark.function
df = df.filter(F.col("id") == "A001")

# pandas-like
df = df.filter(df['id'] == "A001")
df = df.filter(df.id == "A001")

Cependant, si possible, vous devez créer un cadre de données Spark à partir de date_list et le rejoindre.

df = df.filter(F.col("dt").isin(date_list))

orderBy

Le tri ne convient pas au traitement distribué, il vaut donc mieux ne pas en faire autant.

#Colonne unique uniquement
df = df.orderBy("count", ascending=False)

#Tri multi-conditions
df = df.orderBy(F.col("id").asc(), F.col("cound").desc())

groupBy (aggregate)

# count()
df = df.groupBy("id").count()

# multiple
# alias()Le nom de la colonne est modifié par la fonction
#Exemple: agrégation d'utilisateurs
df = df.groupBy("id").agg(
  F.count(F.lit(1)).alias("count"),
  F.mean(F.col("diff")).alias("diff_mean"),
  F.stddev(F.col("diff")).alias("diff_stddev"),
  F.min(F.col("diff")).alias("diff_min"),
  F.max(F.col("diff")).alias("diff_max")
)

> df.show()
(réduction)

# =======================
#Exemple: agrégation par date et heure de l'utilisateur
df = df.groupBy("id", "dt").agg(
  F.count(F.lit(1)).alias("count")
  )
  
> df.show()
+---+-----------+------+
| id|         dt| count|
+---+-----------+------+
|  a| 2020/01/01|     7|
|  a| 2020/01/02|     5|
|  a| 2020/01/03|     4|
+---+-----------+------+

# ===========================
#Exemple: agrégation par date / heure / emplacement utilisateur
df = df.groupBy("id", "dt", "location_id").agg(
  F.count(F.lit(1)).alias("count")
  )

> df.show()
+---+-----------+------------+------+
| id|         dt| location_id| count|
+---+-----------+------------+------+
|  a| 2020/01/01|           A|     2|
|  a| 2020/01/01|           B|     3|
|  a| 2020/01/01|           C|     2|
:   :           :            :      :
+---+-----------+------------+------+
#Exemple: nombre d'utilisateurs uniques par date
df = df.groupBy("dt").agg(countDistinct("id").alias("id_count"))

> df.show()
+-----------+---------+
|         dt| id_count|
+-----------+---------+
| 2020/01/01|        7|
| 2020/01/02|        5|
| 2020/01/03|        4|
+-----------+---------+

# ===============================
#Exemple: nombre de jours pendant lesquels chaque utilisateur a été en contact au moins une fois
df = df.groupBy("id").agg(countDistinct("dt").alias("dt_count"))

> df.show()
+---+---------+
| id| dt_count|
+---+---------+
|  a|       10|
|  b|       15|
|  c|        4|
+---+---------+
group_columns = ["id", "dt"]
df = ad_touched_visit_df.groupBy(*group_columns).count()

fonction de fenêtre

w = Window().orderBy(F.col("id"))
df = df.withColumn("row_num", F.row_number().over(w))
#Ajouter les données de la ligne précédente en tant que colonne
w = Window.partitionBy("id").orderBy("timestamp")
df = df.withColumn("prev_timestamp", F.lag(df["timestamp"]).over(w))

traitement en boucle

Il est fortement déconseillé car incompatible avec les environnements distribués. Il est préférable de ne l'utiliser que lorsque cela doit être fait.

for row in df.rdd.collect():
  do_some_with(row['id'])

Site de référence

Recommended Posts

Manipulation des données PySpark
Manipulation des données avec les Pandas!
Manipuler des données en Python-essayez avec Pandas_plyr
[Français] scicit-learn 0.18 Tutorial Manipulation des données de texte
Jointure de colonne de manipulation de données Pandas, permutation de colonne, changement de nom de colonne
Le traitement des données
[Python] Chapitre 04-02 Diverses structures de données (manipulation de liste)
[Python] Chapitre 04-07 Diverses structures de données (manipulation de dictionnaire)
Faisons la manipulation des données MySQL avec Python