Dieser Artikel basiert auf dem Inhalt der Apache Beam-Dokumentation (https://beam.apache.org/documentation/).
Es implementiert ein Stapelverarbeitungsprogramm mit Apache Beam Python SDK und fasst die Prozedur und Methode zur Ausführung mit Cloud Dataflow zusammen. Es werden auch die Grundkonzepte von Apache Beam, Testen und Design behandelt.
Das Apache Beam SDK kann aus ** Java **, ** Python **, ** Go ** ausgewählt werden und bietet die folgenden ** Funktionen, die den verteilten Verarbeitungsmechanismus ** vereinfachen. tun.
Vom Apache Beam SDK erstellte Programme können auf verteilten Datenverarbeitungssystemen ausgeführt werden, z. In Apache Beam heißt diese Ausführungsumgebung ** Runner **.
Dieses Mal werden wir es in zwei Ausführungsumgebungen ausführen, DirectRunner und DataflowRunner.
Ein allgemeines (einfaches) Apache Beam-Programm wird erstellt und funktioniert wie folgt.
Für diesen Prozessablauf wäre die Pipeline wie folgt:
Lassen Sie uns tatsächlich eine einfache Pipeline wie die oben in Python implementieren. Die Betriebsumgebung wird wie folgt angenommen.
Wenn Sie keine zusätzlichen Pakete benötigen, installieren Sie diese mit dem folgenden Befehl:
pip install apache-beam
Dieses Mal gehen wir davon aus, dass es auf Dataflow (GCP) ausgeführt wird, daher werden wir auch zusätzliche GCP-Pakete installieren.
pip install apache-beam[gcp]
Dies ist der vollständige Code. Ich werde jeden von ihnen unten erklären.
pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
class MyOptions(PipelineOptions):
"""Benutzerdefinierte Optionen."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='./input.txt',
help='Input path for the pipeline')
parser.add_argument(
'--output',
default='./output.txt',
help='Output path for the pipeline')
class ComputeWordLength(beam.DoFn):
"""Konvertierungsprozess zum Ermitteln der Anzahl der Zeichen."""
def __init__(self):
pass
def process(self, element):
yield len(element)
def run():
options = MyOptions()
# options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
(p
| 'ReadFromText' >> beam.io.ReadFromText(options.input) # I/Wenden Sie O Transform an, um Daten in den optionalen Pfad zu laden
| 'ComputeWordLength' >> beam.ParDo(ComputeWordLength()) #Transform anwenden anwenden
| 'WriteToText' >> beam.io.WriteToText(options.output)) # I/Wenden Sie O Transform an und schreiben Sie Daten in den optionalen Pfad
p.run()
if __name__ == '__main__':
run()
Pipeline Das Pipeline-Objekt ** kapselt alle Ihre Datenverarbeitungsaufgaben **. Apache Beam-Programme erstellen normalerweise zuerst ein Pipeline-Objekt, um eine PCollection zu erstellen und eine Transformation anzuwenden.
Um das Apache Beam-Programm verwenden zu können, müssen Sie zuerst eine Instanz der Apache Beam SDK-Pipeline erstellen (normalerweise innerhalb der Hauptfunktion). Wenn Sie dann die Pipeline erstellen, legen Sie die Ausführungsoptionen fest.
Der folgende Code ist ein Beispiel für die Erstellung einer Pipeline-Instanz.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions() #Ausführungsoptionen
p = beam.Pipeline(options=options)
Mit PipelineOptions können Sie die Läufer festlegen, auf denen die Pipeline ausgeführt wird, und ** spezifische Optionen, die für den ausgewählten Läufer ** erforderlich sind. Beispielsweise kann es Informationen wie die Projekt-ID und den Speicherort der Datei enthalten.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner' #Bezeichnung des Läufers
p = beam.Pipeline(options=options)
Es gibt zwei Möglichkeiten: Die eine besteht darin, sie programmgesteuert festzulegen, und die andere darin, sie über ein Befehlszeilenargument zu übergeben. Ein Beispiel wird unten unter [Mit Cloud-Datenfluss ausführen](Ausführen mit # Cloud-Datenfluss-) beschrieben.
Sie können zusätzlich zu den Standard-PipelineOptions ** benutzerdefinierte Optionen ** hinzufügen. Im folgenden Beispiel wird eine Option zum Angeben der Eingabe- und Ausgabepfade hinzugefügt. Mit benutzerdefinierten Optionen können Sie auch eine Beschreibung oder einen Standardwert angeben, die angezeigt werden, wenn der Benutzer "--help" über ein Befehlszeilenargument übergibt.
Sie können benutzerdefinierte Optionen erstellen, indem Sie ** von PipelineOptions erben **.
class MyOptions(PipelineOptions):
"""Benutzerdefinierte Optionen."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input', #Optionsname
default='./input.txt', #Standardwert
help='Input path for the pipeline') #Erläuterung
parser.add_argument(
'--output',
default='./output.txt',
help='Output path for the pipeline')
Übergeben Sie die von Ihnen erstellten Optionen wie folgt:
p = beam.Pipeline(options=MyOptions())
Übergeben Sie einen Wert aus dem Befehlszeilenargument wie folgt, um eine benutzerdefinierte Option auf einen anderen Wert als den Standardwert festzulegen:
--input=value --output==value
PCollection Eine PCollection ist ein ** Objekt, das einen zu verteilenden Datensatz darstellt **. In der Apache Beam-Pipeline verwendet Transform PCollection als Eingabe und Ausgabe. Wenn Sie die Daten in der Pipeline verarbeiten möchten, müssen Sie daher eine PCollection erstellen.
Nach dem Erstellen eines Pipeline-Objekts müssen Sie zunächst mindestens eine PC-Sammlung erstellen.
Verwenden Sie die E / A-Transformation, um Daten von einer externen Quelle zu lesen oder eine PC-Sammlung aus dem Arbeitsspeicher zu erstellen. Letzteres ist hauptsächlich zum Testen und Debuggen nützlich.
Verwenden Sie die E / A-Transformation, um eine PC-Sammlung aus einer externen Quelle zu erstellen. Wenden Sie zum Lesen der Daten die von jeder E / A-Transformation bereitgestellte Lesetransformation auf das Pipeline-Objekt an.
So wenden Sie eine Lesetransformation auf eine Pipeline an, um eine PCollection zu erstellen:
lines = p | 'ReadFromText' >> beam.io.ReadFromText('gs://some/input-data.txt')
Verwenden Sie Create Transform, um eine PC-Sammlung aus dem Arbeitsspeicher zu erstellen.
lines = (p | 'ReadFromInMemory' >> beam.Create(['To be, or not to be: that is the question: ', 'Whether \'tis nobler in the mind to suffer ', 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ']))
Transform Transform bietet ein ** allgemeines Verarbeitungsframework **. Die Transformation wird auf jedes Element der Eingabe-PC-Sammlung angewendet.
Das Apache Beam SDK bietet eine Vielzahl von Transformationen, die Sie auf Ihre PCollection anwenden können. Dies umfasst generische ** Core-Transformationen ** wie ParDo und Combine sowie ** Composite-Transformationen **, die eine oder mehrere Core-Transformationen kombinieren. Es werden verschiedene Transformationen bereitgestellt. Weitere Informationen finden Sie unter hier.
Jede Transformation im Apache Beam SDK enthält den Pipe-Operator |
, sodass Sie die Transformation anwenden können, indem Sie diese Methode auf die Eingabe-PCollection anwenden.
[Output PCollection] = [Input PCollection] | [Transform]
Sie können Transformationen auch verketten, um eine Pipeline wie folgt zu erstellen:
[Output PCollection] = ([Initial Input PCollection]
| [First Transform]
| [Second Transform]
| [Third Transform])
Diese Pipeline hat den gleichen Ablauf wie dieses Implementierungsbeispiel, sodass die Pipeline diese Form hat.
Transform erstellt eine neue PCollection, ohne Änderungen an der eingegebenen PCollection vorzunehmen. ** Transform ändert die Eingabe-PC-Sammlung nicht. ** PCollection ist per Definition unveränderlich. Daher können Sie mehrere Transformationen auf dieselbe PCollection anwenden, um die PCollection zu verzweigen.
[Output PCollection] = [Initial Input PCollection]
[Output PCollection A] = [Output PCollection] | [Transform A]
[Output PCollection B] = [Output PCollection] | [Transform B]
Die Form dieser Pipeline sieht folgendermaßen aus:
I/O Transform Wenn Sie eine Pipeline erstellen, müssen Sie häufig Daten aus einer externen Quelle wie einer Datei oder Datenbank lesen. Ebenso können Sie Daten aus der Pipeline an ein externes Speichersystem ausgeben.
Das Apache Beam SDK bietet eine E / A-Transformation für die allgemeinen Datenspeichertypen (https://beam.apache.org/documentation/io/built-in/). Wenn Sie nicht unterstützten Datenspeicher lesen oder schreiben möchten, müssen Sie Ihre eigene E / A-Transformation implementieren.
Read Transform transformiert gelesene Daten von einer externen Quelle in eine PCollection. Sie können die Lesetransformation jederzeit beim Erstellen der Pipeline verwenden, sie wird jedoch in der Regel zuerst ausgeführt.
lines = pipeline | beam.io.ReadFromText('gs://some/input-data.txt')
Write Transform schreibt die Daten in der PCollection in eine externe Datenquelle. Verwenden Sie zum Drucken der Ergebnisse einer Pipeline in den meisten Fällen die Schreibtransformation am Ende der Pipeline.
output | beam.io.WriteToText('gs://some/output-data')
Viele Lesetransformationen unterstützen das Lesen aus mehreren Eingabedateien, die dem Glob-Operator entsprechen. Im folgenden Beispiel wird der Glob-Operator (*) verwendet, um alle übereinstimmenden Eingabedateien mit dem Präfix "input-" und dem Suffix ".csv" am angegebenen Speicherort zu lesen.
lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
Write Transform schreibt standardmäßig in mehrere Dateien. Dabei wird der Dateiname als Präfix für alle Ausgabedateien verwendet.
Im folgenden Beispiel werden mehrere Dateien an einen Speicherort geschrieben. Jeder Datei wird "Zahlen" vorangestellt und ".csv" angehängt.
output | 'WriteToText' >> beam.io.WriteToText('/path/to/numbers', file_name_suffix='.csv')
Lassen Sie uns nun die Pipeline mit Completed Code (#Completed Code) ausführen. Lokal ausführen und Cloud Dataflow als Ausführungsumgebung.
Bereiten Sie eine Textdatei vor, die die folgende Zeichenfolge zur Eingabe enthält.
input.txt
good morning.
good afternoon.
good evening.
Um die Pipeline lokal auszuführen, setzen Sie PipelineOptions als Runner auf "DirectRunner". Sie müssen den Runner jedoch nicht explizit angeben, es sei denn, Sie haben eine bestimmte Einstellung.
Führen Sie den folgenden Befehl über die Befehlszeile aus. Schreiben Sie die Eingabeziel- und Ausgabezielpfade je nach Umgebung neu.
python pipeline.py --input=./input.txt --output=./output.txt
Dieses Implementierungsbeispiel ist eine Pipeline, die die Anzahl der Zeichen in einem Wort zählt, sodass das folgende Ergebnis ausgegeben wird.
Außerdem fügt "beam.io.WriteToText "standardmäßig die Zeichenfolge" 00000-of-00001 "am Ende des Dateinamens hinzu, um sie zu verteilen und in mehrere Dateien zu schreiben. Wenn Sie in eine Datei schreiben möchten, können Sie dies tun, indem Sie das Argument shard_name_template
leeren.
output.txt-00000-of-00001
13
15
13
Cloud Dataflow ist ein vollständig verwalteter Dienst von GCP (Google Cloud Platfom), der Daten im Stream- oder Batch-Modus verarbeitet. .. Benutzer können eine große Datenmenge verarbeiten, indem sie eine praktisch unbegrenzte Kapazität auf Pay-as-you-go-Basis nutzen, ohne sich um den Betrieb von Infrastrukturen wie Servern sorgen zu müssen.
Durch Ausführen der Pipeline in Cloud Dataflow wird ein Job in Ihrem GCP-Projekt erstellt, der Compute Engine- und Cloud Storage-Ressourcen verwendet. Aktivieren Sie ** Dataflow API ** in GCP, um Cloud Dataflow nutzen zu können.
Eine kleine Änderung ist erforderlich, um [Abgeschlossener Code](# Abgeschlossener Code) in Cloud Dataflow auszuführen. Ändern Sie es wie folgt.
pipeline.py
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
GCP_PROJECT_ID = 'my-project-id'
GCS_BUCKET_NAME = 'gs://my-bucket-name'
JOB_NAME = 'compute-word-length'
class MyOptions(PipelineOptions):
"""Benutzerdefinierte Optionen."""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='{}/input.txt'.format(GCS_BUCKET_NAME), #Eingabe in GCS.Setzen Sie txt
help='Input for the pipeline')
parser.add_argument(
'--output',
default='{}/output.txt'.format(GCS_BUCKET_NAME), #Ausgabe an GCS
help='Output for the pipeline')
class ComputeWordLength(beam.DoFn):
"""Konvertierungsprozess zum Ermitteln der Anzahl der Zeichen."""
def __init__(self):
pass
def process(self, element):
yield len(element)
def run():
options = MyOptions()
#GCP-Option
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = GCP_PROJECT_ID #Projekt-ID
google_cloud_options.job_name = JOB_NAME #Beliebiger Jobname
google_cloud_options.staging_location = '{}/binaries'.format(GCS_BUCKET_NAME) #GCS-Pfad zum Staging von Dateien
google_cloud_options.temp_location = '{}/temp'.format(GCS_BUCKET_NAME) #GCS-Pfad für temporäre Dateien
#Arbeiteroptionen
options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED' #Aktivieren Sie die automatische Skalierung
#Standardoption
options.view_as(StandardOptions).runner = 'DataflowRunner' #Geben Sie den Datenflussläufer an
p = beam.Pipeline(options=options)
(p
| 'ReadFromText' >> beam.io.ReadFromText(options.input)
| 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())
| 'WriteToText' >> beam.io.WriteToText(options.output, shard_name_template=""))
p.run()
# p.run().wait_until_finish() #Blockieren, bis die Pipeline abgeschlossen ist
if __name__ == '__main__':
run()
Weitere Datenflussoptionen finden Sie hier (https://cloud.google.com/dataflow/docs/guides/specifying-exec-params?hl=ja#-cloud-dataflow--). Die Option "Streaming" muss "true" sein, um das Streaming durchzuführen.
Dies kann auch mit einem ähnlichen Befehl ausgeführt werden.
python pipeline.py --input=gs://my-project-id/input.txt --output=gs://my-project-id/output.txt
Im Programm festgelegte Optionen können auch über solche Befehlszeilenargumente übergeben werden.
python pipeline.py \
--input=gs://my-project-id/input.txt \
--output=gs://my-project-id/output.txt \
--runner=DataflowRunner \
--project=my-project-id \
--temp_location=gs://my-project-id/tmp/
...
Sie können die Pipeline überwachen, indem Sie über das GCP auf den Datenflussdienst zugreifen. Die Benutzeroberfläche sieht folgendermaßen aus und das Ergebnis wird an den angegebenen Pfad ausgegeben.
Wenn Sie eine solche Stapelverarbeitung von Dataflow regelmäßig ausführen möchten, ist es zweckmäßig, ** Dataflow-Vorlage ** zu verwenden. Weitere Informationen finden Sie unter hier.
Beim Testen von Pipelines kann ** das Testen lokaler Einheiten häufig viel Zeit und Mühe sparen ** als das Debuggen von Remote-Läufen wie Dataflow.
Sie müssen Folgendes installieren, um die Abhängigkeit aufzulösen:
pip install nose
Verwenden Sie zum Testen der Pipeline das Objekt "TestPipeline". Anstatt die Eingabe von einer externen Quelle zu lesen, verwenden Sie "apache_beam.Create", um eine PC-Sammlung aus dem Arbeitsspeicher zu erstellen. Vergleichen Sie die Ausgabe mit assert_that
.
test_pipeline.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from src.pipeline import ComputeWordLength
class PipelineTest(TestCase):
def test_pipeline(self):
expected = [
13,
15,
13
]
inputs = [
'good morning.',
'good afternoon.',
'good evening.'
]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
In [oben](Anwenden von # transform-) haben wir das Design (Verarbeitungsablauf) beim Erstellen einer einfachen Pipeline und einer Verzweigungspipeline kurz erläutert. Hier werden wir andere gängige Pipeline-Designs vorstellen.
Dies kann mithilfe der Funktion Zusätzliche Ausgaben von Apache Beam (https://beam.apache.org/documentation/programming-guide/#additional-outputs) erreicht werden.
class ExtractWord(beam.DoFn):
def process(element):
if element.startswith('A'):
yield pvalue.TaggedOutput('a', element) #Geben Sie einen Tag-Namen ein (Anfang'A'Wenn es ein Element von ist'a')
elif element.startswith('B'):
yield pvalue.TaggedOutput('b', element) #Geben Sie einen Tag-Namen ein (Anfang'B'Wenn es ein Element von ist'b')
mixed_col = db_row_col | beam.ParDo(ExtractWord()).with_outputs()
mixed_col.a | beam.ParDo(...) # .Kann über den Tag-Namen aufgerufen werden
mixed_col.b | beam.ParDo(...)
Dies kann durch Verwendung von "Abflachen" erreicht werden.
col_list = (a_col, b_col) | beam.Flatten()
Sie können aus jeder Eingabequelle eine PC-Sammlung erstellen und diese mit CoGroupByKey
usw. verknüpfen.
user_address = p | beam.io.ReadFromText(...)
user_order = p | beam.io.ReadFromText(...)
joined_col = (user_address, user_order) | beam.CoGroupByKey()
joined_col | beam.ParDo(...)
Möglicherweise möchten Sie auch die folgenden Funktionen kennen, damit Sie verschiedene Anwendungsfälle behandeln können.
Composite transforms Zusammengesetzte Transformationen sind eine Kombination mehrerer Transformationen (ParDo, Combine, GroupByKey ...). Durch das Verschachteln mehrerer Transformationen wird Ihr Code modularer und verständlicher.
Um zusammengesetzte Transformationen zu implementieren, müssen Sie die Transform-Klasse erweitern und die expand-Methode überschreiben.
"""Eine Pipeline, die die Anzahl der Wörter in einem Satz zählt."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class ComputeWordCount(beam.PTransform):
"""Zusammengesetzte Transformationen, die die Anzahl der Wörter zählen."""
def __init__(self):
pass
def expand(self, pcoll):
return (pcoll
| 'SplitWithHalfSpace' >> beam.Map(lambda element: element.split(' '))
| 'ComputeArraySize' >> beam.Map(lambda element: len(element)))
def run():
p = beam.Pipeline(options=PipelineOptions())
inputs = ['There is no time like the present.', 'Time is money.']
(p
| 'Create' >> beam.Create(inputs)
| 'ComputeWordCount' >> ComputeWordCount()
| 'WriteToText' >> beam.io.WriteToText('Zielpfad ausgeben'))
p.run()
if __name__ == '__main__':
run()
output
7
3
Side inputs Seiteneingänge sind eine Funktion, mit der Sie zusätzlich zu den normalen Eingängen (Haupteingang) PCollection zusätzliche Eingänge (sekundäre Eingänge) an eine Transformation übergeben können.
"""Eine Pipeline, die nur Zeichenfolgen mit mehr als durchschnittlichen Zeichen ausgibt."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import pvalue
class FilterMeanLengthFn(beam.DoFn):
"""Filtern Sie Zeichenfolgen mit überdurchschnittlicher Anzahl von Zeichen."""
def __init__(self):
pass
# mean_word_Länge ist eine Untereingabe
def process(self, element, mean_word_length):
if len(element) >= mean_word_length:
yield element
def run():
p = beam.Pipeline(options=PipelineOptions())
inputs = ["good morning.", "good afternoon.", "good evening."]
#Sub-Input
mean_word_length = word_lengths | 'ComputeMeanWordLength' >> beam.CombineGlobally(beam.combiners.MeanCombineFn())
#Haupteingang
output = (p
| 'Create' >> beam.Create(inputs)
| 'FilterMeanLength' >> beam.ParDo(FilterMeanLengthFn(), pvalue.AsSingleton(mean_word_length)) #Fügen Sie eine Untereingabe in das zweite Argument von ParDo ein
| 'write to text' >> beam.io.WriteToText('Zielpfad ausgeben'))
p.run().wait_until_finish()
if __name__ == '__main__':
run()
Die Anzahl der Zeichen "Guten Morgen", "Guten Tag" und "Guten Abend" beträgt "13", "15" bzw. "13", und der Durchschnitt liegt bei etwa 13,67. Die Ausgabe ist also wie folgt.
output
good afternoon.
Es beschreibt ein wenig über "was in der Pipeline passiert".
Eine der kostspieligsten Operationen bei der verteilten Pipeline-Verarbeitung ist das ** Serialisieren und Kommunizieren von Elementen zwischen Maschinen **. Der Apache Beam Runner serialisiert die Elemente der PCollection, beispielsweise weil er zwischen Maschinen kommuniziert. Kommunizieren Sie im nächsten Schritt Elemente zwischen der Transformation und der Transformation mit den folgenden Techniken:
Apache Beam konzentriert sich auf das Problem Peinlich parallel. Da Apache Beam der parallelen Verarbeitung von Elementen große Bedeutung beimisst, ist es nicht gut, Aktionen wie ** Zuweisen von Sequenznummern zu jedem Element von PCollection ** auszudrücken. Dies liegt daran, dass solche Algorithmen viel häufiger Skalierbarkeitsprobleme aufweisen.
** Die parallele Verarbeitung aller Elemente ** hat auch einige Nachteile. Zum Beispiel beim Schreiben eines Elements in das Ausgabeziel. Bei der Ausgabeverarbeitung ist es nicht möglich, alle Elemente parallel zu stapeln.
Daher verarbeitet der Apache Beam-Runner nicht alle Elemente gleichzeitig, sondern bündelt und verarbeitet die Elemente der PCollection. Bei der Streaming-Verarbeitung wird es tendenziell in kleinen Einheiten gebündelt und verarbeitet, und bei der Stapelverarbeitung wird es tendenziell in größeren Einheiten gebündelt und verarbeitet.
Wenn Sie ein einzelnes ParDo ausführen, kann der Apache Beam-Läufer die Elemente der PC-Sammlung in zwei Teile teilen und bündeln.
Wenn das ParDo ausgeführt wird, verarbeitet der Worker die beiden Bundles parallel, wie unten gezeigt.
Da ein einzelnes Element nicht geteilt werden kann, hängt die maximale Parallelverarbeitung einer Transformation von der Anzahl der Elemente in der PCollection ab. Die maximale Anzahl paralleler Prozesse beträgt in diesem Fall ** 9 **, wie aus der Abbildung ersichtlich.
ParDos können untergeordnete Parallelen sein. Beispielsweise sind ParDo1 und ParDo2 parallel untergeordnet, wenn die Ausgabe von ParDo1 von demselben Mitarbeiter wie folgt verarbeitet werden muss:
Worker1 führt ParDo1 für die Elemente von Bundle A aus, das zu Bundle C wird. Dann wird ParDo2 für die Elemente von Bundle C ausgeführt. In ähnlicher Weise führt Worker2 ParDo1 für die Elemente von Bundle B aus, das zu Bundle D wird. Dann wird ParDo2 für die Elemente von Bundle D ausgeführt.
Durch die Ausführung von ParDo auf diese Weise können Apache Beam-Läufer vermeiden, Elemente zwischen Arbeitern neu zu verteilen. Das spart Kommunikationskosten. ** Die maximale Anzahl paralleler Prozesse hängt jetzt jedoch von der maximalen Anzahl paralleler Prozesse für das erste ParDo in der abhängigen Parallele ab. ** ** **
Wenn die Verarbeitung der Elemente im Bundle fehlschlägt, schlägt das gesamte Bundle fehl. Daher muss der Prozess erneut versucht werden (andernfalls schlägt die gesamte Pipeline fehl).
Im folgenden Beispiel verarbeitet Worker1 erfolgreich alle fünf Elemente von Bundle A. Worker2 behandelt die vier Elemente von Bundle B, aber die ersten beiden Elemente von Bundle B werden erfolgreich verarbeitet und das dritte Element ist nicht erfolgreich.
Der Apache Beam-Läufer wiederholt dann alle Elemente von Bundle B und wird beim zweiten Mal erfolgreich abgeschlossen. Wie gezeigt, werden ** Wiederholungsversuche nicht immer im selben Worker wie beim ursprünglichen Verarbeitungsversuch ausgeführt. ** ** **
Wenn die Elemente in ParDo2 nach der Verarbeitung von ParDo1 nicht verarbeitet werden können, schlagen diese beiden Transformationen gleichzeitig fehl.
Im folgenden Beispiel führt Worker2 ParDo1 erfolgreich für alle Elemente von Bundle B aus. ParDo2 schlägt jedoch fehl, da es die Elemente von Bundle D nicht verarbeiten kann.
Infolgedessen muss der Apache Beam-Läufer die ParDo2-Ausgabe verwerfen und den Vorgang erneut ausführen. In diesem Fall muss auch das ParDo1-Bundle zerstört und alle Elemente des ** Bundles erneut versucht werden. ** ** **
Ich habe versucht, das Gelernte anhand des Inhalts der Apache Beam-Dokumentation zusammenzufassen. Bitte weisen Sie auf Fehler hin! : Bogen:
Recommended Posts