[PYTHON] Füllen Sie den fehlenden Wert (null) von DataFrame mit den Werten davor und danach mit pyspark

Überblick

In pyspark (Spark SQL) ffill (Forward Fill) und [bfill] in pandas ](Https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.bfill.html) (Rückwärtsfüllung) ist standardmäßig nicht vorhanden. Wenn daher eine enge Verarbeitung erforderlich ist, müssen Sie diese selbst erstellen. (Persönliches Memo)

Referenzen (Antwort)

Es scheint, dass Sie es wie der obige Link tun sollten

Praktisches Beispiel

Die Referenzen haben das Problem fast gelöst, aber ich werde es versuchen.

Überprüfungsumgebung:

Vorbereitung

#Import der benötigten Bibliotheken
import sys
from typing import (
    Union,
    List,
)

import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    first,
    last,
)
from pyspark.sql.window import Window

#Funken-Sitzung generieren
spark = SparkSession.builder.getOrCreate()

numpy, pandas sind hauptsächlich für die Erstellung von Testdaten enthalten

Implementieren Sie ffill und bfill in function

def ffill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    forward fill
    
    Args:
        target:Spalte, für die der Nullwert vorwärts gefüllt ist
        partition:Spalte zum Gruppieren von Datensätzen (Liste für mehrere)
        sort_key:Spalte zur Bestimmung der Reihenfolge
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(-sys.maxsize, 0)

    filled_column = last(col(target), ignorenulls=True).over(window)

    return filled_column


def bfill(
    target: str,
    partition: Union[str, List[str]],
    sort_key: str,
) -> pyspark.sql.Column:
    """
    backward fill
    
    Args:
        target:Spalte, für die der Nullwert wieder ausgefüllt ist
        partition:Spalte zum Gruppieren von Datensätzen (Liste für mehrere)
        sort_key:Spalte zur Bestimmung der Reihenfolge
    """
    window = Window.partitionBy(partition) \
         .orderBy(sort_key) \
         .rowsBetween(0, sys.maxsize)

    filled_column = first(col(target), ignorenulls=True).over(window)

    return filled_column

Ich habe eine Übersicht mit docstring geschrieben, aber es ist eine kurze Ergänzung

Beispiel 1

#Vorbereitung der Testdaten
test = pd.DataFrame({
    "id": ['A']*10 + ['B']*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
    .replace(np.nan, None) #Wenn es sich um einen numerischen Typ handelt`NaN`Wird gespeichert, ersetzen Sie es durch null

df_test.show()
# +---+-------------------+-----+
# | id|          timestamp|value|
# +---+-------------------+-----+
# |  A|2020-08-12 15:30:00|  0.0|
# |  A|2020-08-12 15:30:01| null|
# |  A|2020-08-12 15:30:02| null|
# |  A|2020-08-12 15:30:03|  3.0|
# |  A|2020-08-12 15:30:04| null|
# |  A|2020-08-12 15:30:05|  5.0|
# |  A|2020-08-12 15:30:06|  3.0|
# |  A|2020-08-12 15:30:07| null|
# |  A|2020-08-12 15:30:08| null|
# |  A|2020-08-12 15:30:09|  2.0|
# |  B|2020-08-12 15:30:10| null|
# |  B|2020-08-12 15:30:11|  4.0|
# |  B|2020-08-12 15:30:12|  2.0|
# |  B|2020-08-12 15:30:13| null|
# |  B|2020-08-12 15:30:14| null|
# |  B|2020-08-12 15:30:15|  9.0|
# |  B|2020-08-12 15:30:16|  2.0|
# |  B|2020-08-12 15:30:17|  8.0|
# |  B|2020-08-12 15:30:18| null|
# |  B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+

#Üben Sie die Null-Vervollständigung mit der zuvor erstellten Funktion
df_test \
    .withColumn(
    "ffill",
    ffill(target="value", partition="id", sort_key="timestamp")
) \
    .withColumn(
    "bfill",
    bfill(target="value", partition="id", sort_key="timestamp")
) \
    .show()
# +---+-------------------+-----+------------+-------------+
# | id|          timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# |  B|2020-08-12 15:30:10| null|        null|          4.0|
# |  B|2020-08-12 15:30:11|  4.0|         4.0|          4.0|
# |  B|2020-08-12 15:30:12|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:13| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:14| null|         2.0|          9.0|
# |  B|2020-08-12 15:30:15|  9.0|         9.0|          9.0|
# |  B|2020-08-12 15:30:16|  2.0|         2.0|          2.0|
# |  B|2020-08-12 15:30:17|  8.0|         8.0|          8.0|
# |  B|2020-08-12 15:30:18| null|         8.0|         null|
# |  B|2020-08-12 15:30:19| null|         8.0|         null|
# |  A|2020-08-12 15:30:00|  0.0|         0.0|          0.0|
# |  A|2020-08-12 15:30:01| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:02| null|         0.0|          3.0|
# |  A|2020-08-12 15:30:03|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:04| null|         3.0|          5.0|
# |  A|2020-08-12 15:30:05|  5.0|         5.0|          5.0|
# |  A|2020-08-12 15:30:06|  3.0|         3.0|          3.0|
# |  A|2020-08-12 15:30:07| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:08| null|         3.0|          2.0|
# |  A|2020-08-12 15:30:09|  2.0|         2.0|          2.0|
# +---+-------------------+-----+------------+-------------+

Beispiel 2

#Vorbereitung der Testdaten (2)
test2 = pd.DataFrame({
    "key1": ['A']*10 + ['B']*10,
    "key2": [1, 2]*10,
    "timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
    "value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)

df_test2.show()
# +----+----+-------------------+------+
# |key1|key2|          timestamp| value|
# +----+----+-------------------+------+
# |   A|   1|2020-08-12 15:30:00|   foo|
# |   A|   2|2020-08-12 15:30:01|  null|
# |   A|   1|2020-08-12 15:30:02|  null|
# |   A|   2|2020-08-12 15:30:03|   bar|
# |   A|   1|2020-08-12 15:30:04|  null|
# |   A|   2|2020-08-12 15:30:05|  hoge|
# |   A|   1|2020-08-12 15:30:06|foofoo|
# |   A|   2|2020-08-12 15:30:07|  null|
# |   A|   1|2020-08-12 15:30:08|  null|
# |   A|   2|2020-08-12 15:30:09|foobar|
# |   B|   1|2020-08-12 15:30:10|  null|
# |   B|   2|2020-08-12 15:30:11|   aaa|
# |   B|   1|2020-08-12 15:30:12|   bbb|
# |   B|   2|2020-08-12 15:30:13|  null|
# |   B|   1|2020-08-12 15:30:14|  null|
# |   B|   2|2020-08-12 15:30:15|   ccc|
# |   B|   1|2020-08-12 15:30:16|   xxx|
# |   B|   2|2020-08-12 15:30:17|   zzz|
# |   B|   1|2020-08-12 15:30:18|  null|
# |   B|   2|2020-08-12 15:30:19|  null|
# +----+----+-------------------+------+

#Üben Sie die Null-Vervollständigung mit der zuvor erstellten Funktion
df_test2 \
    .withColumn(
    "forward fill",
    ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .withColumn(
    "backward fill",
    bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
    .show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2|          timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# |   B|   1|2020-08-12 15:30:10|  null|        null|          bbb|
# |   B|   1|2020-08-12 15:30:12|   bbb|         bbb|          bbb|
# |   B|   1|2020-08-12 15:30:14|  null|         bbb|          xxx|
# |   B|   1|2020-08-12 15:30:16|   xxx|         xxx|          xxx|
# |   B|   1|2020-08-12 15:30:18|  null|         xxx|         null|
# |   A|   2|2020-08-12 15:30:01|  null|        null|          bar|
# |   A|   2|2020-08-12 15:30:03|   bar|         bar|          bar|
# |   A|   2|2020-08-12 15:30:05|  hoge|        hoge|         hoge|
# |   A|   2|2020-08-12 15:30:07|  null|        hoge|       foobar|
# |   A|   2|2020-08-12 15:30:09|foobar|      foobar|       foobar|
# |   A|   1|2020-08-12 15:30:00|   foo|         foo|          foo|
# |   A|   1|2020-08-12 15:30:02|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:04|  null|         foo|       foofoo|
# |   A|   1|2020-08-12 15:30:06|foofoo|      foofoo|       foofoo|
# |   A|   1|2020-08-12 15:30:08|  null|      foofoo|         null|
# |   B|   2|2020-08-12 15:30:11|   aaa|         aaa|          aaa|
# |   B|   2|2020-08-12 15:30:13|  null|         aaa|          ccc|
# |   B|   2|2020-08-12 15:30:15|   ccc|         ccc|          ccc|
# |   B|   2|2020-08-12 15:30:17|   zzz|         zzz|          zzz|
# |   B|   2|2020-08-12 15:30:19|  null|         zzz|         null|
# +----+----+-------------------+------+------------+-------------+

Ergänzung

Bezüglich des Rückgabewerts der im obigen Beispiel erstellten Funktion

display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>

display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>