In pyspark (Spark SQL) ffill (Forward Fill) und [bfill] in pandas
](Https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.bfill.html) (Rückwärtsfüllung) ist standardmäßig nicht vorhanden.
Wenn daher eine enge Verarbeitung erforderlich ist, müssen Sie diese selbst erstellen. (Persönliches Memo)
Es scheint, dass Sie es wie der obige Link tun sollten
Die Referenzen haben das Problem fast gelöst, aber ich werde es versuchen.
Überprüfungsumgebung:
#Import der benötigten Bibliotheken
import sys
from typing import (
Union,
List,
)
import numpy as np
import pandas as pd
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col,
first,
last,
)
from pyspark.sql.window import Window
#Funken-Sitzung generieren
spark = SparkSession.builder.getOrCreate()
numpy
, pandas
sind hauptsächlich für die Erstellung von Testdaten enthalten
def ffill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
forward fill
Args:
target:Spalte, für die der Nullwert vorwärts gefüllt ist
partition:Spalte zum Gruppieren von Datensätzen (Liste für mehrere)
sort_key:Spalte zur Bestimmung der Reihenfolge
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(-sys.maxsize, 0)
filled_column = last(col(target), ignorenulls=True).over(window)
return filled_column
def bfill(
target: str,
partition: Union[str, List[str]],
sort_key: str,
) -> pyspark.sql.Column:
"""
backward fill
Args:
target:Spalte, für die der Nullwert wieder ausgefüllt ist
partition:Spalte zum Gruppieren von Datensätzen (Liste für mehrere)
sort_key:Spalte zur Bestimmung der Reihenfolge
"""
window = Window.partitionBy(partition) \
.orderBy(sort_key) \
.rowsBetween(0, sys.maxsize)
filled_column = first(col(target), ignorenulls=True).over(window)
return filled_column
Ich habe eine Übersicht mit docstring geschrieben, aber es ist eine kurze Ergänzung
window
:sys.maxsize
bedeutet praktisch unendlichrowsBetween (start, end)
ist das Ziel der Verarbeitung Erstellen Sie einen Bereich, der die Datensätze von "Start" vor bis "Ende" nach dem Datensatz enthält.sys.maxsize
im Fall von ffill" vom ersten Datensatz zu dem Datensatz, der in dem durch partitionBy getrennten Bereich verarbeitet werden soll "und im Fall von bfill" vom Datensatz, der in dem durch partitionBy getrennten Bereich verarbeitet werden soll "verwendet wird. Bezieht sich auf den Bereich "bis zum letzten Datensatz"filled_column
--Wenden Sie die Verarbeitung mit der Fensterfunktion von window
oben auf alle Datensätze an
--Fill / bfill mit last
/ first
, die letzte / erste Null im Fensterbereich Gibt einen Wert zurück, der nicht ist#Vorbereitung der Testdaten
test = pd.DataFrame({
"id": ['A']*10 + ['B']*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": [0, None, None, 3, None, 5, 3, None, None, 2, None, 4, 2, None, None, 9, 2, 8, None, None]
})
df_test = spark.createDataFrame(test) \
.replace(np.nan, None) #Wenn es sich um einen numerischen Typ handelt`NaN`Wird gespeichert, ersetzen Sie es durch null
df_test.show()
# +---+-------------------+-----+
# | id| timestamp|value|
# +---+-------------------+-----+
# | A|2020-08-12 15:30:00| 0.0|
# | A|2020-08-12 15:30:01| null|
# | A|2020-08-12 15:30:02| null|
# | A|2020-08-12 15:30:03| 3.0|
# | A|2020-08-12 15:30:04| null|
# | A|2020-08-12 15:30:05| 5.0|
# | A|2020-08-12 15:30:06| 3.0|
# | A|2020-08-12 15:30:07| null|
# | A|2020-08-12 15:30:08| null|
# | A|2020-08-12 15:30:09| 2.0|
# | B|2020-08-12 15:30:10| null|
# | B|2020-08-12 15:30:11| 4.0|
# | B|2020-08-12 15:30:12| 2.0|
# | B|2020-08-12 15:30:13| null|
# | B|2020-08-12 15:30:14| null|
# | B|2020-08-12 15:30:15| 9.0|
# | B|2020-08-12 15:30:16| 2.0|
# | B|2020-08-12 15:30:17| 8.0|
# | B|2020-08-12 15:30:18| null|
# | B|2020-08-12 15:30:19| null|
# +---+-------------------+-----+
#Üben Sie die Null-Vervollständigung mit der zuvor erstellten Funktion
df_test \
.withColumn(
"ffill",
ffill(target="value", partition="id", sort_key="timestamp")
) \
.withColumn(
"bfill",
bfill(target="value", partition="id", sort_key="timestamp")
) \
.show()
# +---+-------------------+-----+------------+-------------+
# | id| timestamp|value|forward fill|backward fill|
# +---+-------------------+-----+------------+-------------+
# | B|2020-08-12 15:30:10| null| null| 4.0|
# | B|2020-08-12 15:30:11| 4.0| 4.0| 4.0|
# | B|2020-08-12 15:30:12| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:13| null| 2.0| 9.0|
# | B|2020-08-12 15:30:14| null| 2.0| 9.0|
# | B|2020-08-12 15:30:15| 9.0| 9.0| 9.0|
# | B|2020-08-12 15:30:16| 2.0| 2.0| 2.0|
# | B|2020-08-12 15:30:17| 8.0| 8.0| 8.0|
# | B|2020-08-12 15:30:18| null| 8.0| null|
# | B|2020-08-12 15:30:19| null| 8.0| null|
# | A|2020-08-12 15:30:00| 0.0| 0.0| 0.0|
# | A|2020-08-12 15:30:01| null| 0.0| 3.0|
# | A|2020-08-12 15:30:02| null| 0.0| 3.0|
# | A|2020-08-12 15:30:03| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:04| null| 3.0| 5.0|
# | A|2020-08-12 15:30:05| 5.0| 5.0| 5.0|
# | A|2020-08-12 15:30:06| 3.0| 3.0| 3.0|
# | A|2020-08-12 15:30:07| null| 3.0| 2.0|
# | A|2020-08-12 15:30:08| null| 3.0| 2.0|
# | A|2020-08-12 15:30:09| 2.0| 2.0| 2.0|
# +---+-------------------+-----+------------+-------------+
#Vorbereitung der Testdaten (2)
test2 = pd.DataFrame({
"key1": ['A']*10 + ['B']*10,
"key2": [1, 2]*10,
"timestamp": pd.date_range(start="2020-08-12T15:30:00",periods=20, freq='1s'),
"value": ["foo", None, None, "bar", None, "hoge", "foofoo", None, None, "foobar", None, "aaa", "bbb", None, None, "ccc", "xxx", "zzz", None, None]
})
df_test2 = spark.createDataFrame(test2)
df_test2.show()
# +----+----+-------------------+------+
# |key1|key2| timestamp| value|
# +----+----+-------------------+------+
# | A| 1|2020-08-12 15:30:00| foo|
# | A| 2|2020-08-12 15:30:01| null|
# | A| 1|2020-08-12 15:30:02| null|
# | A| 2|2020-08-12 15:30:03| bar|
# | A| 1|2020-08-12 15:30:04| null|
# | A| 2|2020-08-12 15:30:05| hoge|
# | A| 1|2020-08-12 15:30:06|foofoo|
# | A| 2|2020-08-12 15:30:07| null|
# | A| 1|2020-08-12 15:30:08| null|
# | A| 2|2020-08-12 15:30:09|foobar|
# | B| 1|2020-08-12 15:30:10| null|
# | B| 2|2020-08-12 15:30:11| aaa|
# | B| 1|2020-08-12 15:30:12| bbb|
# | B| 2|2020-08-12 15:30:13| null|
# | B| 1|2020-08-12 15:30:14| null|
# | B| 2|2020-08-12 15:30:15| ccc|
# | B| 1|2020-08-12 15:30:16| xxx|
# | B| 2|2020-08-12 15:30:17| zzz|
# | B| 1|2020-08-12 15:30:18| null|
# | B| 2|2020-08-12 15:30:19| null|
# +----+----+-------------------+------+
#Üben Sie die Null-Vervollständigung mit der zuvor erstellten Funktion
df_test2 \
.withColumn(
"forward fill",
ffill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.withColumn(
"backward fill",
bfill(target="value", partition=["key1", "key2"], sort_key="timestamp")
) \
.show()
# +----+----+-------------------+------+------------+-------------+
# |key1|key2| timestamp| value|forward fill|backward fill|
# +----+----+-------------------+------+------------+-------------+
# | B| 1|2020-08-12 15:30:10| null| null| bbb|
# | B| 1|2020-08-12 15:30:12| bbb| bbb| bbb|
# | B| 1|2020-08-12 15:30:14| null| bbb| xxx|
# | B| 1|2020-08-12 15:30:16| xxx| xxx| xxx|
# | B| 1|2020-08-12 15:30:18| null| xxx| null|
# | A| 2|2020-08-12 15:30:01| null| null| bar|
# | A| 2|2020-08-12 15:30:03| bar| bar| bar|
# | A| 2|2020-08-12 15:30:05| hoge| hoge| hoge|
# | A| 2|2020-08-12 15:30:07| null| hoge| foobar|
# | A| 2|2020-08-12 15:30:09|foobar| foobar| foobar|
# | A| 1|2020-08-12 15:30:00| foo| foo| foo|
# | A| 1|2020-08-12 15:30:02| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:04| null| foo| foofoo|
# | A| 1|2020-08-12 15:30:06|foofoo| foofoo| foofoo|
# | A| 1|2020-08-12 15:30:08| null| foofoo| null|
# | B| 2|2020-08-12 15:30:11| aaa| aaa| aaa|
# | B| 2|2020-08-12 15:30:13| null| aaa| ccc|
# | B| 2|2020-08-12 15:30:15| ccc| ccc| ccc|
# | B| 2|2020-08-12 15:30:17| zzz| zzz| zzz|
# | B| 2|2020-08-12 15:30:19| null| zzz| null|
# +----+----+-------------------+------+------------+-------------+
Bezüglich des Rückgabewerts der im obigen Beispiel erstellten Funktion
display(ffill(target="value", partition="id", sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition="id", sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY id ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
display(ffill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'last(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>
display(bfill(target="value", partition=["key1", "key2"], sort_key="timestamp"))
# Column<b'first(value, true) OVER (PARTITION BY key1, key2 ORDER BY timestamp ASC NULLS FIRST ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)'>
pyspark.sql
verwenden, um basierend auf den Argumenten eine entsprechende SQL-Anweisung zu generieren.
--Gegeben Sie einen DataFrame, der "