[PYTHON] Remplissez la valeur manquante (null) de DataFrame avec les valeurs avant et après avec pyspark

Aperçu

Dans pyspark (Spark SQL), ffill (forward fill) et [bfill] dans pandas ](Https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.bfill.html) (remplissage vers l'arrière) n'existe pas par défaut. Par conséquent, si un traitement précis est nécessaire, il est nécessaire de le concevoir vous-même. (Mémo personnel)

Références (réponse)

Il semble que vous devriez le faire comme le lien ci-dessus

Exemple pratique

Les références ont presque résolu le problème, mais je vais essayer.

Environnement de vérification:

Préparation

#Importation des bibliothèques requises
import sys
from typing import (
    Union,
    List,
)

import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    first,
    last,
)
from pyspark.sql.window import Window

#Générer une session Spark
spark = SparkSession.builder.getOrCreate()

numpy, pandas sont principalement inclus pour la création de données de test

Implémenter ffill et bfill en fonction

def ffill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    forward fill
    
    Args:
        target:Colonne pour laquelle la valeur nulle est remplie en avant
        partition:Colonne pour regrouper les enregistrements (liste pour plusieurs)
        sort_key:Colonne de détermination de la commande
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(-sys.maxsize, 0)

    filled_column = last(col(target), ignorenulls=True).over(window)

    return filled_column


def bfill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    backward fill
    
    Args:
        target:Colonne pour laquelle la valeur nulle est remplie à nouveau
        partition:Colonne pour regrouper les enregistrements (liste pour plusieurs)
        sort_key:Colonne de détermination de la commande
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(0, sys.maxsize)

    filled_column = first(col(target), ignorenulls=True).over(window)

    return filled_column

J'ai écrit un aperçu avec docstring, mais c'est un bref supplément

Exemple 1

#Préparation des données de test
test = pd.DataFrame({
    "id": ['A']*10 + ['B']*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
    .replace(np.nan, None) #S'il s'agit d'un type numérique`NaN`Sera stocké, remplacez-le par null

df_test.show()
# +---+-------------------+-----+
# | id|          timestamp|value|
# +---+-------------------+-----+
# |  A|2020-08-12 15:30:00|  0.0|
# |  A|2020-08-12 15:30:01| null|
# |  A|2020-08-12 15:30:02| null|
# |  A|2020-08-12 15:30:03|  3.0|
# |  A|2020-08-12 15:30:04| null|
# |  A|2020-08-12 15:30:05|  5.0|
# |  A|2020-08-12 15:30:06|  3.0|
# |  A|2020-08-12 15:30:07| null|
# |  A|2020-08-12 15:30:08| null|
# |  A|2020-08-12 15:30:09|  2.0|
# |  B|2020-08-12 15:30:10| null|
# |  B|2020-08-12 15:30:11|  4.0|
# |  B|2020-08-12 15:30:12|  2.0|
# |  B|2020-08-12 15:30:13| null|
# |  B|2020-08-12 15:30:14| null|
# |  B|2020-08-12 15:30:15|  9.0|
# |  B|2020-08-12 15:30:16|  2.0|
# |  B|2020-08-12 15:30:17|  8.0|
# |  B|2020-08-12 15:30:18| null|
# |  B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+

#Pratiquez la complétion nulle en utilisant la fonction créée précédemment
df_test \
    .withColumn(
    "ffill",
    ffill(target="value", partition="id", sort_key="timestamp")
) \
    .withColumn(
    "bfill",
    bfill(target="value", partition="id", sort_key="timestamp")
) \
    .show()
# +---+-------------------+-----+------------+-------------+
# | id|          timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# |  B|2020-08-12 15:30:10| null|        null|          4.0|
# |  B|2020-08-12 15:30:11|  4.0|         4.0|          4.0|
# |  B|2020-08-12 15:30:12|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:13| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:14| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:15|  9.0|         9.0|          9.0|
# |  B|2020-08-12 15:30:16|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:17|  8.0|         8.0|          8.0|
# |  B|2020-08-12 15:30:18| null|         8.0|         null|
# |  B|2020-08-12 15:30:19| null|         8.0|         null|
# |  A|2020-08-12 15:30:00|  0.0|         0.0|          0.0|
# |  A|2020-08-12 15:30:01| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:02| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:03|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:04| null|         3.0|          5.0|
# |  A|2020-08-12 15:30:05|  5.0|         5.0|          5.0|
# |  A|2020-08-12 15:30:06|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:07| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:08| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:09|  2.0|         2.0|          2.0|
# +---+-------------------+-----+------------+-------------+

――C'est un peu difficile à voir, mais il est séparé par ʻid: A, B --Trier par horodatage dans chaque délimiteur ʻid et remplir et remplir la valeur manquante de valeur --Comparer valeur avec remplissage avant et remplissage arrière

Exemple 2

#Préparation des données de test (2)
test2 = pd.DataFrame({
    "key1": ['A']*10 + ['B']*10,
    "key2": [1, 2]*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)

df_test2.show()
# +----+----+-------------------+------+
# |key1|key2|          timestamp| value|
# +----+----+-------------------+------+
# |   A|   1|2020-08-12 15:30:00|   foo|
# |   A|   2|2020-08-12 15:30:01|  null|
# |   A|   1|2020-08-12 15:30:02|  null|
# |   A|   2|2020-08-12 15:30:03|   bar|
# |   A|   1|2020-08-12 15:30:04|  null|
# |   A|   2|2020-08-12 15:30:05|  hoge|
# |   A|   1|2020-08-12 15:30:06|foofoo|
# |   A|   2|2020-08-12 15:30:07|  null|
# |   A|   1|2020-08-12 15:30:08|  null|
# |   A|   2|2020-08-12 15:30:09|foobar|
# |   B|   1|2020-08-12 15:30:10|  null|
# |   B|   2|2020-08-12 15:30:11|   aaa|
# |   B|   1|2020-08-12 15:30:12|   bbb|
# |   B|   2|2020-08-12 15:30:13|  null|
# |   B|   1|2020-08-12 15:30:14|  null|
# |   B|   2|2020-08-12 15:30:15|   ccc|
# |   B|   1|2020-08-12 15:30:16|   xxx|
# |   B|   2|2020-08-12 15:30:17|   zzz|
# |   B|   1|2020-08-12 15:30:18|  null|
# |   B|   2|2020-08-12 15:30:19|  null|
# +----+----+-------------------+------+

#Pratiquez la complétion nulle en utilisant la fonction créée précédemment
df_test2 \
    .withColumn(
    "forward fill",
    ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .withColumn(
    "backward fill",
    bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2|          timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# |   B|   1|2020-08-12 15:30:10|  null|        null|          bbb|
# |   B|   1|2020-08-12 15:30:12|   bbb|         bbb|          bbb|
# |   B|   1|2020-08-12 15:30:14|  null|         bbb|          xxx|
# |   B|   1|2020-08-12 15:30:16|   xxx|         xxx|          xxx|
# |   B|   1|2020-08-12 15:30:18|  null|         xxx|         null|
# |   A|   2|2020-08-12 15:30:01|  null|        null|          bar|
# |   A|   2|2020-08-12 15:30:03|   bar|         bar|          bar|
# |   A|   2|2020-08-12 15:30:05|  hoge|        hoge|         hoge|
# |   A|   2|2020-08-12 15:30:07|  null|        hoge|       foobar|
# |   A|   2|2020-08-12 15:30:09|foobar|      foobar|       foobar|
# |   A|   1|2020-08-12 15:30:00|   foo|         foo|          foo|
# |   A|   1|2020-08-12 15:30:02|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:04|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:06|foofoo|      foofoo|       foofoo|
# |   A|   1|2020-08-12 15:30:08|  null|      foofoo|         null|
# |   B|   2|2020-08-12 15:30:11|   aaa|         aaa|          aaa|
# |   B|   2|2020-08-12 15:30:13|  null|         aaa|          ccc|
# |   B|   2|2020-08-12 15:30:15|   ccc|         ccc|          ccc|
# |   B|   2|2020-08-12 15:30:17|   zzz|         zzz|          zzz|
# |   B|   2|2020-08-12 15:30:19|  null|         zzz|         null|
# +----+----+-------------------+------+------------+-------------+

Supplément

Concernant la valeur de retour de la fonction créée dans l'exemple ci-dessus

display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>

display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>