[GO] Apache Beam (Datenfluss) Praktische Einführung [Python]

Einführung

Dieser Artikel basiert auf dem Inhalt der Apache Beam-Dokumentation (https://beam.apache.org/documentation/).

Es implementiert ein Stapelverarbeitungsprogramm mit Apache Beam Python SDK und fasst die Prozedur und Methode zur Ausführung mit Cloud Dataflow zusammen. Es werden auch die Grundkonzepte von Apache Beam, Testen und Design behandelt.

beam-logo-full-color-name-right-500.png

Erste Schritte mit dem Apache Beam SDK

Das Apache Beam SDK kann aus ** Java **, ** Python **, ** Go ** ausgewählt werden und bietet die folgenden ** Funktionen, die den verteilten Verarbeitungsmechanismus ** vereinfachen. tun.

Apache Beam-Ausführungsumgebung

Vom Apache Beam SDK erstellte Programme können auf verteilten Datenverarbeitungssystemen ausgeführt werden, z. In Apache Beam heißt diese Ausführungsumgebung ** Runner **.

Dieses Mal werden wir es in zwei Ausführungsumgebungen ausführen, DirectRunner und DataflowRunner.

Pipeline-Implementierung

Ein allgemeines (einfaches) Apache Beam-Programm wird erstellt und funktioniert wie folgt.

  1. Erstellen Sie ein ** Pipeline-Objekt ** und legen Sie die Ausführungsoptionen fest.
  2. Verwenden Sie ** Read Transform **, um Daten von einem externen Speichersystem oder In-Memory zu lesen und ** eine PCollection zu erstellen **.
  3. Wenden Sie ** Transform ** auf PCollection an. Transformieren kann die Elemente in PCollection mit verschiedenen Logiken transformieren.
  4. Wenden Sie ** Write Transform ** an, um die durch Transform transformierte PCollection in eine externe Quelle zu schreiben.

Für diesen Prozessablauf wäre die Pipeline wie folgt:

image.png

Lassen Sie uns tatsächlich eine einfache Pipeline wie die oben in Python implementieren. Die Betriebsumgebung wird wie folgt angenommen.

Installieren des Apache Beam SDK

Wenn Sie keine zusätzlichen Pakete benötigen, installieren Sie diese mit dem folgenden Befehl:

pip install apache-beam

Dieses Mal gehen wir davon aus, dass es auf Dataflow (GCP) ausgeführt wird, daher werden wir auch zusätzliche GCP-Pakete installieren.

pip install apache-beam[gcp]

Abschlusscode

Dies ist der vollständige Code. Ich werde jeden von ihnen unten erklären.

pipeline.py


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


class MyOptions(PipelineOptions):
    """Benutzerdefinierte Optionen."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            default='./input.txt',
            help='Input path for the pipeline')

        parser.add_argument(
            '--output',
            default='./output.txt',
            help='Output path for the pipeline')


class ComputeWordLength(beam.DoFn):
    """Konvertierungsprozess zum Ermitteln der Anzahl der Zeichen."""

    def __init__(self):
        pass

    def process(self, element):
        yield len(element)


def run():
    options = MyOptions()
    # options.view_as(StandardOptions).runner = 'DirectRunner'
    p = beam.Pipeline(options=options)

    (p
     | 'ReadFromText' >> beam.io.ReadFromText(options.input)  # I/Wenden Sie O Transform an, um Daten in den optionalen Pfad zu laden
     | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())  #Transform anwenden anwenden
     | 'WriteToText' >> beam.io.WriteToText(options.output))  # I/Wenden Sie O Transform an und schreiben Sie Daten in den optionalen Pfad

    p.run()


if __name__ == '__main__':
    run()

Pipeline Das Pipeline-Objekt ** kapselt alle Ihre Datenverarbeitungsaufgaben **. Apache Beam-Programme erstellen normalerweise zuerst ein Pipeline-Objekt, um eine PCollection zu erstellen und eine Transformation anzuwenden.

Erstellen einer Pipeline

Um das Apache Beam-Programm verwenden zu können, müssen Sie zuerst eine Instanz der Apache Beam SDK-Pipeline erstellen (normalerweise innerhalb der Hauptfunktion). Wenn Sie dann die Pipeline erstellen, legen Sie die Ausführungsoptionen fest.

Der folgende Code ist ein Beispiel für die Erstellung einer Pipeline-Instanz.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


options = PipelineOptions()  #Ausführungsoptionen
p = beam.Pipeline(options=options)

PipelineOptions-Einstellungen

Mit PipelineOptions können Sie die Läufer festlegen, auf denen die Pipeline ausgeführt wird, und ** spezifische Optionen, die für den ausgewählten Läufer ** erforderlich sind. Beispielsweise kann es Informationen wie die Projekt-ID und den Speicherort der Datei enthalten.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions


options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'  #Bezeichnung des Läufers

p = beam.Pipeline(options=options)

Es gibt zwei Möglichkeiten: Die eine besteht darin, sie programmgesteuert festzulegen, und die andere darin, sie über ein Befehlszeilenargument zu übergeben. Ein Beispiel wird unten unter [Mit Cloud-Datenfluss ausführen](Ausführen mit # Cloud-Datenfluss-) beschrieben.

Fügen Sie benutzerdefinierte Optionen hinzu

Sie können zusätzlich zu den Standard-PipelineOptions ** benutzerdefinierte Optionen ** hinzufügen. Im folgenden Beispiel wird eine Option zum Angeben der Eingabe- und Ausgabepfade hinzugefügt. Mit benutzerdefinierten Optionen können Sie auch eine Beschreibung oder einen Standardwert angeben, die angezeigt werden, wenn der Benutzer "--help" über ein Befehlszeilenargument übergibt.

Sie können benutzerdefinierte Optionen erstellen, indem Sie ** von PipelineOptions erben **.

class MyOptions(PipelineOptions):
    """Benutzerdefinierte Optionen."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',  #Optionsname
            default='./input.txt',  #Standardwert
            help='Input path for the pipeline')  #Erläuterung

        parser.add_argument(
            '--output',
            default='./output.txt',
            help='Output path for the pipeline')

Übergeben Sie die von Ihnen erstellten Optionen wie folgt:

p = beam.Pipeline(options=MyOptions())

Übergeben Sie einen Wert aus dem Befehlszeilenargument wie folgt, um eine benutzerdefinierte Option auf einen anderen Wert als den Standardwert festzulegen:

--input=value --output==value

PCollection Eine PCollection ist ein ** Objekt, das einen zu verteilenden Datensatz darstellt **. In der Apache Beam-Pipeline verwendet Transform PCollection als Eingabe und Ausgabe. Wenn Sie die Daten in der Pipeline verarbeiten möchten, müssen Sie daher eine PCollection erstellen.

Nach dem Erstellen eines Pipeline-Objekts müssen Sie zunächst mindestens eine PC-Sammlung erstellen.

Erstellen einer PC-Sammlung

Verwenden Sie die E / A-Transformation, um Daten von einer externen Quelle zu lesen oder eine PC-Sammlung aus dem Arbeitsspeicher zu erstellen. Letzteres ist hauptsächlich zum Testen und Debuggen nützlich.

Erstellen Sie eine PC-Sammlung aus einer externen Quelle

Verwenden Sie die E / A-Transformation, um eine PC-Sammlung aus einer externen Quelle zu erstellen. Wenden Sie zum Lesen der Daten die von jeder E / A-Transformation bereitgestellte Lesetransformation auf das Pipeline-Objekt an.

So wenden Sie eine Lesetransformation auf eine Pipeline an, um eine PCollection zu erstellen:

lines = p | 'ReadFromText' >> beam.io.ReadFromText('gs://some/input-data.txt')

Erstellen Sie eine PC-Sammlung aus dem Arbeitsspeicher

Verwenden Sie Create Transform, um eine PC-Sammlung aus dem Arbeitsspeicher zu erstellen.

lines = (p | 'ReadFromInMemory' >> beam.Create(['To be, or not to be: that is the question: ', 'Whether \'tis nobler in the mind to suffer ', 'The slings and arrows of outrageous fortune, ', 'Or to take arms against a sea of troubles, ']))

Transform Transform bietet ein ** allgemeines Verarbeitungsframework **. Die Transformation wird auf jedes Element der Eingabe-PC-Sammlung angewendet.

Das Apache Beam SDK bietet eine Vielzahl von Transformationen, die Sie auf Ihre PCollection anwenden können. Dies umfasst generische ** Core-Transformationen ** wie ParDo und Combine sowie ** Composite-Transformationen **, die eine oder mehrere Core-Transformationen kombinieren. Es werden verschiedene Transformationen bereitgestellt. Weitere Informationen finden Sie unter hier.

Transform anwenden anwenden

Jede Transformation im Apache Beam SDK enthält den Pipe-Operator |, sodass Sie die Transformation anwenden können, indem Sie diese Methode auf die Eingabe-PCollection anwenden.

[Output PCollection] = [Input PCollection] | [Transform]

Sie können Transformationen auch verketten, um eine Pipeline wie folgt zu erstellen:

[Output PCollection] = ([Initial Input PCollection] 
                             | [First Transform]
                             | [Second Transform]
                             | [Third Transform])

Diese Pipeline hat den gleichen Ablauf wie dieses Implementierungsbeispiel, sodass die Pipeline diese Form hat.

image.png

Transform erstellt eine neue PCollection, ohne Änderungen an der eingegebenen PCollection vorzunehmen. ** Transform ändert die Eingabe-PC-Sammlung nicht. ** PCollection ist per Definition unveränderlich. Daher können Sie mehrere Transformationen auf dieselbe PCollection anwenden, um die PCollection zu verzweigen.

[Output PCollection] = [Initial Input PCollection]

[Output PCollection A] = [Output PCollection] | [Transform A]
[Output PCollection B] = [Output PCollection] | [Transform B]

Die Form dieser Pipeline sieht folgendermaßen aus:

image.png

I/O Transform Wenn Sie eine Pipeline erstellen, müssen Sie häufig Daten aus einer externen Quelle wie einer Datei oder Datenbank lesen. Ebenso können Sie Daten aus der Pipeline an ein externes Speichersystem ausgeben.

Das Apache Beam SDK bietet eine E / A-Transformation für die allgemeinen Datenspeichertypen (https://beam.apache.org/documentation/io/built-in/). Wenn Sie nicht unterstützten Datenspeicher lesen oder schreiben möchten, müssen Sie Ihre eigene E / A-Transformation implementieren.

Daten lesen

Read Transform transformiert gelesene Daten von einer externen Quelle in eine PCollection. Sie können die Lesetransformation jederzeit beim Erstellen der Pipeline verwenden, sie wird jedoch in der Regel zuerst ausgeführt.

lines = pipeline | beam.io.ReadFromText('gs://some/input-data.txt')

Daten schreiben

Write Transform schreibt die Daten in der PCollection in eine externe Datenquelle. Verwenden Sie zum Drucken der Ergebnisse einer Pipeline in den meisten Fällen die Schreibtransformation am Ende der Pipeline.

output | beam.io.WriteToText('gs://some/output-data')

Lesen aus mehreren Dateien

Viele Lesetransformationen unterstützen das Lesen aus mehreren Eingabedateien, die dem Glob-Operator entsprechen. Im folgenden Beispiel wird der Glob-Operator (*) verwendet, um alle übereinstimmenden Eingabedateien mit dem Präfix "input-" und dem Suffix ".csv" am angegebenen Speicherort zu lesen.

lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')

Schreiben in mehrere Dateien

Write Transform schreibt standardmäßig in mehrere Dateien. Dabei wird der Dateiname als Präfix für alle Ausgabedateien verwendet.

Im folgenden Beispiel werden mehrere Dateien an einen Speicherort geschrieben. Jeder Datei wird "Zahlen" vorangestellt und ".csv" angehängt.

output | 'WriteToText' >> beam.io.WriteToText('/path/to/numbers', file_name_suffix='.csv')

Pipeline ausführen

Lassen Sie uns nun die Pipeline mit Completed Code (#Completed Code) ausführen. Lokal ausführen und Cloud Dataflow als Ausführungsumgebung.

Bereiten Sie eine Textdatei vor, die die folgende Zeichenfolge zur Eingabe enthält.

input.txt


good morning.
good afternoon.
good evening.

Lokal ausführen

Um die Pipeline lokal auszuführen, setzen Sie PipelineOptions als Runner auf "DirectRunner". Sie müssen den Runner jedoch nicht explizit angeben, es sei denn, Sie haben eine bestimmte Einstellung.

Führen Sie den folgenden Befehl über die Befehlszeile aus. Schreiben Sie die Eingabeziel- und Ausgabezielpfade je nach Umgebung neu.

python pipeline.py --input=./input.txt --output=./output.txt

Dieses Implementierungsbeispiel ist eine Pipeline, die die Anzahl der Zeichen in einem Wort zählt, sodass das folgende Ergebnis ausgegeben wird. Außerdem fügt "beam.io.WriteToText "standardmäßig die Zeichenfolge" 00000-of-00001 "am Ende des Dateinamens hinzu, um sie zu verteilen und in mehrere Dateien zu schreiben. Wenn Sie in eine Datei schreiben möchten, können Sie dies tun, indem Sie das Argument shard_name_template leeren.

output.txt-00000-of-00001


13
15
13

In Cloud Dataflow ausführen

Cloud Dataflow ist ein vollständig verwalteter Dienst von GCP (Google Cloud Platfom), der Daten im Stream- oder Batch-Modus verarbeitet. .. Benutzer können eine große Datenmenge verarbeiten, indem sie eine praktisch unbegrenzte Kapazität auf Pay-as-you-go-Basis nutzen, ohne sich um den Betrieb von Infrastrukturen wie Servern sorgen zu müssen.

Durch Ausführen der Pipeline in Cloud Dataflow wird ein Job in Ihrem GCP-Projekt erstellt, der Compute Engine- und Cloud Storage-Ressourcen verwendet. Aktivieren Sie ** Dataflow API ** in GCP, um Cloud Dataflow nutzen zu können.

Eine kleine Änderung ist erforderlich, um [Abgeschlossener Code](# Abgeschlossener Code) in Cloud Dataflow auszuführen. Ändern Sie es wie folgt.

pipeline.py


import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions


GCP_PROJECT_ID = 'my-project-id'
GCS_BUCKET_NAME = 'gs://my-bucket-name'
JOB_NAME = 'compute-word-length'


class MyOptions(PipelineOptions):
    """Benutzerdefinierte Optionen."""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--input',
            default='{}/input.txt'.format(GCS_BUCKET_NAME),  #Eingabe in GCS.Setzen Sie txt
            help='Input for the pipeline')

        parser.add_argument(
            '--output',
            default='{}/output.txt'.format(GCS_BUCKET_NAME),  #Ausgabe an GCS
            help='Output for the pipeline')


class ComputeWordLength(beam.DoFn):
    """Konvertierungsprozess zum Ermitteln der Anzahl der Zeichen."""

    def __init__(self):
        pass

    def process(self, element):
        yield len(element)


def run():
    options = MyOptions()

    #GCP-Option
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = GCP_PROJECT_ID  #Projekt-ID
    google_cloud_options.job_name = JOB_NAME  #Beliebiger Jobname
    google_cloud_options.staging_location = '{}/binaries'.format(GCS_BUCKET_NAME)  #GCS-Pfad zum Staging von Dateien
    google_cloud_options.temp_location = '{}/temp'.format(GCS_BUCKET_NAME)  #GCS-Pfad für temporäre Dateien

    #Arbeiteroptionen
    options.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'  #Aktivieren Sie die automatische Skalierung

    #Standardoption
    options.view_as(StandardOptions).runner = 'DataflowRunner'  #Geben Sie den Datenflussläufer an

    p = beam.Pipeline(options=options)

    (p
     | 'ReadFromText' >> beam.io.ReadFromText(options.input)
     | 'ComputeWordLength' >> beam.ParDo(ComputeWordLength())
     | 'WriteToText' >> beam.io.WriteToText(options.output, shard_name_template=""))

    p.run()
    # p.run().wait_until_finish()  #Blockieren, bis die Pipeline abgeschlossen ist


if __name__ == '__main__':
    run()

Weitere Datenflussoptionen finden Sie hier (https://cloud.google.com/dataflow/docs/guides/specifying-exec-params?hl=ja#-cloud-dataflow--). Die Option "Streaming" muss "true" sein, um das Streaming durchzuführen.

Dies kann auch mit einem ähnlichen Befehl ausgeführt werden.

python pipeline.py --input=gs://my-project-id/input.txt --output=gs://my-project-id/output.txt

Im Programm festgelegte Optionen können auch über solche Befehlszeilenargumente übergeben werden.

python pipeline.py \
  --input=gs://my-project-id/input.txt \
  --output=gs://my-project-id/output.txt \
  --runner=DataflowRunner \
  --project=my-project-id \
  --temp_location=gs://my-project-id/tmp/
  ...

Sie können die Pipeline überwachen, indem Sie über das GCP auf den Datenflussdienst zugreifen. Die Benutzeroberfläche sieht folgendermaßen aus und das Ergebnis wird an den angegebenen Pfad ausgegeben.

スクリーンショット 2020-01-03 15.36.16.png

Wenn Sie eine solche Stapelverarbeitung von Dataflow regelmäßig ausführen möchten, ist es zweckmäßig, ** Dataflow-Vorlage ** zu verwenden. Weitere Informationen finden Sie unter hier.

Pipeline-Tests

Beim Testen von Pipelines kann ** das Testen lokaler Einheiten häufig viel Zeit und Mühe sparen ** als das Debuggen von Remote-Läufen wie Dataflow.

Sie müssen Folgendes installieren, um die Abhängigkeit aufzulösen:

pip install nose

Implementierungsbeispiel

Verwenden Sie zum Testen der Pipeline das Objekt "TestPipeline". Anstatt die Eingabe von einer externen Quelle zu lesen, verwenden Sie "apache_beam.Create", um eine PC-Sammlung aus dem Arbeitsspeicher zu erstellen. Vergleichen Sie die Ausgabe mit assert_that.

test_pipeline.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

from src.pipeline import ComputeWordLength


class PipelineTest(TestCase):

    def test_pipeline(self):
        expected = [
            13,
            15,
            13
        ]

        inputs = [
            'good morning.',
            'good afternoon.',
            'good evening.'
        ]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

Pipeline-Design

In [oben](Anwenden von # transform-) haben wir das Design (Verarbeitungsablauf) beim Erstellen einer einfachen Pipeline und einer Verzweigungspipeline kurz erläutert. Hier werden wir andere gängige Pipeline-Designs vorstellen.

Eine Pipeline mit einer Transformation, die mehrere PC-Sammlungen erzeugt

image.png

Dies kann mithilfe der Funktion Zusätzliche Ausgaben von Apache Beam (https://beam.apache.org/documentation/programming-guide/#additional-outputs) erreicht werden.

class ExtractWord(beam.DoFn):

   def process(element):
        if element.startswith('A'):
            yield pvalue.TaggedOutput('a', element)  #Geben Sie einen Tag-Namen ein (Anfang'A'Wenn es ein Element von ist'a')
        elif element.startswith('B'):
            yield pvalue.TaggedOutput('b', element)  #Geben Sie einen Tag-Namen ein (Anfang'B'Wenn es ein Element von ist'b')


mixed_col = db_row_col | beam.ParDo(ExtractWord()).with_outputs()

mixed_col.a | beam.ParDo(...)  # .Kann über den Tag-Namen aufgerufen werden
mixed_col.b | beam.ParDo(...)

Eine Pipeline mit einer Transformation, die PCollections verbindet

image.png

Dies kann durch Verwendung von "Abflachen" erreicht werden.

col_list = (a_col, b_col) | beam.Flatten()

Pipeline mit mehreren Eingabequellen

image.png

Sie können aus jeder Eingabequelle eine PC-Sammlung erstellen und diese mit CoGroupByKey usw. verknüpfen.

user_address = p | beam.io.ReadFromText(...)
user_order = p | beam.io.ReadFromText(...)

joined_col = (user_address, user_order) | beam.CoGroupByKey()

joined_col | beam.ParDo(...)

Andere nützliche Funktionen

Möglicherweise möchten Sie auch die folgenden Funktionen kennen, damit Sie verschiedene Anwendungsfälle behandeln können.

Composite transforms Zusammengesetzte Transformationen sind eine Kombination mehrerer Transformationen (ParDo, Combine, GroupByKey ...). Durch das Verschachteln mehrerer Transformationen wird Ihr Code modularer und verständlicher.

Implementierungsbeispiel

Um zusammengesetzte Transformationen zu implementieren, müssen Sie die Transform-Klasse erweitern und die expand-Methode überschreiben.

"""Eine Pipeline, die die Anzahl der Wörter in einem Satz zählt."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class ComputeWordCount(beam.PTransform):
    """Zusammengesetzte Transformationen, die die Anzahl der Wörter zählen."""

    def __init__(self):
        pass

    def expand(self, pcoll):
        return (pcoll
                | 'SplitWithHalfSpace' >> beam.Map(lambda element: element.split(' '))
                | 'ComputeArraySize' >> beam.Map(lambda element: len(element)))


def run():
    p = beam.Pipeline(options=PipelineOptions())

    inputs = ['There is no time like the present.', 'Time is money.']

    (p
     | 'Create' >> beam.Create(inputs)
     | 'ComputeWordCount' >> ComputeWordCount()
     | 'WriteToText' >> beam.io.WriteToText('Zielpfad ausgeben'))

    p.run()

if __name__ == '__main__':
    run()
    

output


7
3

Side inputs Seiteneingänge sind eine Funktion, mit der Sie zusätzlich zu den normalen Eingängen (Haupteingang) PCollection zusätzliche Eingänge (sekundäre Eingänge) an eine Transformation übergeben können.

Implementierungsbeispiel

"""Eine Pipeline, die nur Zeichenfolgen mit mehr als durchschnittlichen Zeichen ausgibt."""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam import pvalue


class FilterMeanLengthFn(beam.DoFn):
    """Filtern Sie Zeichenfolgen mit überdurchschnittlicher Anzahl von Zeichen."""

    def __init__(self):
        pass

    # mean_word_Länge ist eine Untereingabe
    def process(self, element, mean_word_length):
        if len(element) >= mean_word_length:
            yield element


def run():
    p = beam.Pipeline(options=PipelineOptions())

    inputs = ["good morning.", "good afternoon.", "good evening."]

    #Sub-Input
    mean_word_length = word_lengths | 'ComputeMeanWordLength' >> beam.CombineGlobally(beam.combiners.MeanCombineFn())

    #Haupteingang
    output = (p
              | 'Create' >> beam.Create(inputs)
              | 'FilterMeanLength' >> beam.ParDo(FilterMeanLengthFn(), pvalue.AsSingleton(mean_word_length))  #Fügen Sie eine Untereingabe in das zweite Argument von ParDo ein
              | 'write to text' >> beam.io.WriteToText('Zielpfad ausgeben'))

    p.run().wait_until_finish()


if __name__ == '__main__':
    run()

Die Anzahl der Zeichen "Guten Morgen", "Guten Tag" und "Guten Abend" beträgt "13", "15" bzw. "13", und der Durchschnitt liegt bei etwa 13,67. Die Ausgabe ist also wie folgt.

output


good afternoon.

Was ist in der Pipeline los?

Es beschreibt ein wenig über "was in der Pipeline passiert".

Serialisieren und kommunizieren

Eine der kostspieligsten Operationen bei der verteilten Pipeline-Verarbeitung ist das ** Serialisieren und Kommunizieren von Elementen zwischen Maschinen **. Der Apache Beam Runner serialisiert die Elemente der PCollection, beispielsweise weil er zwischen Maschinen kommuniziert. Kommunizieren Sie im nächsten Schritt Elemente zwischen der Transformation und der Transformation mit den folgenden Techniken:

  1. Serialisieren Sie das Element und leiten Sie es an den Worker weiter
  2. Serialisieren Sie das Element und verteilen Sie es an mehrere Mitarbeiter
  3. Bei Verwendung von Seiteneingängen muss das Element serialisiert und an alle Mitarbeiter gesendet werden
  4. Wenn die Transformation und die Transformation des nächsten Schritts von demselben Worker ausgeführt werden, werden die Elemente im Arbeitsspeicher kommuniziert (die Kommunikationskosten können reduziert werden, indem nicht serialisiert wird).

Gebündelt und hartnäckig

Apache Beam konzentriert sich auf das Problem Peinlich parallel. Da Apache Beam der parallelen Verarbeitung von Elementen große Bedeutung beimisst, ist es nicht gut, Aktionen wie ** Zuweisen von Sequenznummern zu jedem Element von PCollection ** auszudrücken. Dies liegt daran, dass solche Algorithmen viel häufiger Skalierbarkeitsprobleme aufweisen.

** Die parallele Verarbeitung aller Elemente ** hat auch einige Nachteile. Zum Beispiel beim Schreiben eines Elements in das Ausgabeziel. Bei der Ausgabeverarbeitung ist es nicht möglich, alle Elemente parallel zu stapeln.

Daher verarbeitet der Apache Beam-Runner nicht alle Elemente gleichzeitig, sondern bündelt und verarbeitet die Elemente der PCollection. Bei der Streaming-Verarbeitung wird es tendenziell in kleinen Einheiten gebündelt und verarbeitet, und bei der Stapelverarbeitung wird es tendenziell in größeren Einheiten gebündelt und verarbeitet.

Parallelverarbeitung

Parallelverarbeitung in Transform

Wenn Sie ein einzelnes ParDo ausführen, kann der Apache Beam-Läufer die Elemente der PC-Sammlung in zwei Teile teilen und bündeln.

image.png

Wenn das ParDo ausgeführt wird, verarbeitet der Worker die beiden Bundles parallel, wie unten gezeigt.

image.png

Da ein einzelnes Element nicht geteilt werden kann, hängt die maximale Parallelverarbeitung einer Transformation von der Anzahl der Elemente in der PCollection ab. Die maximale Anzahl paralleler Prozesse beträgt in diesem Fall ** 9 **, wie aus der Abbildung ersichtlich.

Parallele Verarbeitung zwischen Transformationen

ParDos können untergeordnete Parallelen sein. Beispielsweise sind ParDo1 und ParDo2 parallel untergeordnet, wenn die Ausgabe von ParDo1 von demselben Mitarbeiter wie folgt verarbeitet werden muss:

image.png

Worker1 führt ParDo1 für die Elemente von Bundle A aus, das zu Bundle C wird. Dann wird ParDo2 für die Elemente von Bundle C ausgeführt. In ähnlicher Weise führt Worker2 ParDo1 für die Elemente von Bundle B aus, das zu Bundle D wird. Dann wird ParDo2 für die Elemente von Bundle D ausgeführt.

image.png

Durch die Ausführung von ParDo auf diese Weise können Apache Beam-Läufer vermeiden, Elemente zwischen Arbeitern neu zu verteilen. Das spart Kommunikationskosten. ** Die maximale Anzahl paralleler Prozesse hängt jetzt jedoch von der maximalen Anzahl paralleler Prozesse für das erste ParDo in der abhängigen Parallele ab. ** ** **

Verhalten bei Auftreten eines Fehlers

Verhalten bei einem Fehler in der Transformation

Wenn die Verarbeitung der Elemente im Bundle fehlschlägt, schlägt das gesamte Bundle fehl. Daher muss der Prozess erneut versucht werden (andernfalls schlägt die gesamte Pipeline fehl).

Im folgenden Beispiel verarbeitet Worker1 erfolgreich alle fünf Elemente von Bundle A. Worker2 behandelt die vier Elemente von Bundle B, aber die ersten beiden Elemente von Bundle B werden erfolgreich verarbeitet und das dritte Element ist nicht erfolgreich.

Der Apache Beam-Läufer wiederholt dann alle Elemente von Bundle B und wird beim zweiten Mal erfolgreich abgeschlossen. Wie gezeigt, werden ** Wiederholungsversuche nicht immer im selben Worker wie beim ursprünglichen Verarbeitungsversuch ausgeführt. ** ** **

image.png

Verhalten, wenn zwischen Transformationen ein Fehler auftritt

Wenn die Elemente in ParDo2 nach der Verarbeitung von ParDo1 nicht verarbeitet werden können, schlagen diese beiden Transformationen gleichzeitig fehl.

Im folgenden Beispiel führt Worker2 ParDo1 erfolgreich für alle Elemente von Bundle B aus. ParDo2 schlägt jedoch fehl, da es die Elemente von Bundle D nicht verarbeiten kann.

Infolgedessen muss der Apache Beam-Läufer die ParDo2-Ausgabe verwerfen und den Vorgang erneut ausführen. In diesem Fall muss auch das ParDo1-Bundle zerstört und alle Elemente des ** Bundles erneut versucht werden. ** ** **

image.png

Zusammenfassung

Ich habe versucht, das Gelernte anhand des Inhalts der Apache Beam-Dokumentation zusammenzufassen. Bitte weisen Sie auf Fehler hin! : Bogen:

Recommended Posts

Apache Beam (Datenfluss) Praktische Einführung [Python]
Berühren Sie Apache Beam mit Python
Apache Beam Cheet Sheet [Python]
Einführung in Apache Beam mit Cloud-Datenfluss (über 2.0.0-Serie) ~ Grundlegender Teil ~ ParDo ~
Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Combine Edition ~
Einführung von Python
Einführung in die Python-Sprache
Einführung in OpenCV (Python) - (2)
Python-Grundkurs (Einführung)
Python-Anfängerhandbuch (Einführung)
Einführung in Apache Beam mit Google Cloud Dataflow (über 2.0.x-Serie) ~ Basic Group By Key ~
Einführung in Python Django (2) Win
Apache mod_auth_tkt und Python AuthTkt
Python3 + Django ~ Mac ~ mit Apache
Einführung in Aktivitäten mit Python
Einführung in die serielle Kommunikation [Python]
Apache auf Macports, Python 3.3 + mod_wsgi3.4 auf Nicht-Macports
Entwurfsmuster in Python: Einführung
[Einführung in Python] <Liste> [Bearbeiten: 22.02.2020]
Einführung in Python (Python-Version APG4b)
Eine Einführung in die Python-Programmierung
Einführung in Python For, While
Apache Beam 2.0.x mit Google Cloud Dataflow beginnend mit IntelliJ und Gradle