Dieser Artikel ist der 6. Tagesartikel von Puri Puri Appliance Adventskalender 2019.
Dieses Mal werde ich vorstellen, wie eine große Datenmenge durch verteilte Verarbeitung mit Dataflow (ApacheBeam) + Python3 verarbeitet wird, das ich normalerweise als ML-Ingenieur verwende.
Was diesmal einzuführen
Lassen Sie uns einen Blick auf den Ablauf vom tatsächlichen Erstellen der Umgebung bis zum Ausführen des Prozesses in Dataflow werfen.
Erstellen wir eine virtuelle Umgebung für die Entwicklung mit Pipenv. (Wenn die Versionen von Python und anderen Bibliotheken identisch sind, müssen Sie Pipenv nicht verwenden.) Starten Sie die Umgebung mit einem Pipfile wie dem folgenden.
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
apache-beam=="1.14.*"
[requires]
python_version = "3.7"
Punkte, auf die Sie achten sollten
--ApacheBeam SDK-Version Dataflow unterstützt SDK-Versionen bis 1.14.
Sie benötigen setup.py
, um es als Vorlage für Dataflow zu speichern und auszuführen.
Hier werden die Abhängigkeiten zur Laufzeit beschrieben.
entry_points
Bitte geben Sie dies gemäß Ihrer eigenen Paketkonfiguration an.
setup.py
PACKAGES = [
"apache-beam[gcp]==2.14.*",
]
setup(
name='dataflow-sample',
url='',
author='',
author_email='',
version='0.1',
install_requires=REQUIRED_PACKAGES,
packages=find_packages(),
include_package_data=True,
entry_points=dict(console_scripts=[
'sample=sample:main'
]),
description='dataflow sample',
)
Richten Sie die Apache Beam-Pipeline ein und implementieren Sie die Hauptfunktion. Die erforderlichen Schritte sind wie folgt.
setup_sample.py
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def setup(args):
runner = "DirectRunner"
return beam.Pipeline(runner, options=PipelineOptions(args))
def main():
pipeline = setup(sys.args)
with pipeline as p:
#Beschreiben Sie die Pipeline
pass
Um tatsächlich mit Dataflow arbeiten zu können, müssen verschiedene im SDK angegebene Optionen festgelegt werden. Das Folgende ist eine Liste typischer.
StandardOptions
name | type | description |
---|---|---|
streaming | boolean | Wählen Sie den Streaming-Modus oder den Batch-Modus |
SetupOptions
name | type | description |
---|---|---|
setup_file | string | setup.Geben Sie den Pfad von py an |
GoogleCouldOptions
name | type | description |
---|---|---|
region | string | Geben Sie die zu verwendende Region an |
project | string | Geben Sie die zu verwendende Projekt-ID an |
job_name | string | Geben Sie den Namen an, als der Job ausgeführt wurde(Beliebiger Wert) |
template_location | string | Geben Sie den Pfad von GCP an, um die Vorlage zu speichern |
Diese Optionen müssen zur Laufzeit im Code oder in Befehlszeilenargumenten angegeben werden. Wenn durch Code angegeben, ist es wie folgt.
options_sample.py
def option_setting(options: PipelineOptions) -> PipelineOptions:
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.region = "asia-northeast1"
cloud_options.project = "your project id"
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = "specify your setup.py path"
return options
def setup(args):
runner = "DirectRunner"
options = option_setting(PipelineOptions(args))
return beam.Pipeline(runner, options=options)
Grundsätzlich verhält es sich wie "Optionen", die Sie mit "PipelineOptions.view_as ()" festlegen möchten. Jetzt müssen Sie nur noch den Wert für die Eigenschaft festlegen, die Sie angeben möchten.
Sie können auch eigene benutzerdefinierte Optionen erstellen, wenn Sie zur Laufzeit über die erforderlichen Einstellungen verfügen. Die Implementierung erbt einfach "PipelineOptions" und überschreibt die erforderlichen Methoden.
costom_options_sample.py
class CostomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--hoge',
type=int,
default=0,
help='This is Costom Value'
)
Definieren wir eine Pipeline, die tatsächlich Daten aus BigQuery liest und in BigQuery speichert. Als einfaches Beispiel implementieren wir den Prozess, bei dem nur die ID aus der Benutzertabelle extrahiert und in eine andere Tabelle eingefügt wird.
pipeline.py
def b2b_pipline(pipe: PCollection):
#Beschreiben Sie die ausgeführte SQL
query = "SELECT id, name, age FROM sample.users"
_ = (pipe
| "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
| "Preprocess" >> beam.Map(lambda data: data["id"])
| "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
table="user_ids",
schema="id:INTEGER",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
)
)
Bei der Pipline-Verarbeitung werden drei Operationen ausgeführt: Eingabe, Zwischenoperation und Ausgabe. Es gibt viele andere Typen als die diesmal eingeführten, sodass Sie sie unter Bezugnahme auf die offizielle Referenz anpassen können.
Verschieben wir die implementierte Pipeline auf Local und GCP.
Beim Ausführen von Apache Beam stehen mehrere Umgebungen zur Auswahl.
Anwendungsfall | Runner | template_location |
---|---|---|
Auf lokal ausführen | DirectRunner | None |
Mit Datenfluss ausführen | DataflowRunner | None |
Als Vorlage in Dataflow ausführen | DataflowRunner | Geben Sie den GCS-Pfad zum Speichern der Vorlage an |
Durch Speichern der Vorlage können Sie die Pipeline in GCS speichern und über die Konsole oder die Befehlszeile starten. Dies ist sehr nützlich, wenn Sie Pipeline pünktlich ausführen möchten.
Dieses Mal habe ich vorgestellt, wie ApcheBeam in Python implementiert und in Dataflow ausgeführt wird. Ich hoffe es wird dir hilfreich sein.
Recommended Posts