Verarbeiten Sie Big Data mit Dataflow (ApacheBeam) + Python3

Einführung

Dieser Artikel ist der 6. Tagesartikel von Puri Puri Appliance Adventskalender 2019.

Dieses Mal werde ich vorstellen, wie eine große Datenmenge durch verteilte Verarbeitung mit Dataflow (ApacheBeam) + Python3 verarbeitet wird, das ich normalerweise als ML-Ingenieur verwende.

Was diesmal einzuführen

Ich werde es tatsächlich berühren

Lassen Sie uns einen Blick auf den Ablauf vom tatsächlichen Erstellen der Umgebung bis zum Ausführen des Prozesses in Dataflow werfen.

Umgebung

Erstellen wir eine virtuelle Umgebung für die Entwicklung mit Pipenv. (Wenn die Versionen von Python und anderen Bibliotheken identisch sind, müssen Sie Pipenv nicht verwenden.) Starten Sie die Umgebung mit einem Pipfile wie dem folgenden.

[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
apache-beam=="1.14.*"

[requires]
python_version = "3.7"

Punkte, auf die Sie achten sollten

--ApacheBeam SDK-Version Dataflow unterstützt SDK-Versionen bis 1.14.

Erstellen Sie Setup.py

Sie benötigen setup.py, um es als Vorlage für Dataflow zu speichern und auszuführen. Hier werden die Abhängigkeiten zur Laufzeit beschrieben. entry_points Bitte geben Sie dies gemäß Ihrer eigenen Paketkonfiguration an.

setup.py


PACKAGES = [
    "apache-beam[gcp]==2.14.*",
]

setup(
    name='dataflow-sample',
    url='',
    author='',
    author_email='',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    entry_points=dict(console_scripts=[
        'sample=sample:main'
    ]),
    description='dataflow sample',
)

Richten Sie die Pipeline ein

Richten Sie die Apache Beam-Pipeline ein und implementieren Sie die Hauptfunktion. Die erforderlichen Schritte sind wie folgt.

setup_sample.py


import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


def setup(args):
    runner = "DirectRunner" 
    return beam.Pipeline(runner, options=PipelineOptions(args))


def main():
    pipeline = setup(sys.args)
    with pipeline as p:
        #Beschreiben Sie die Pipeline
        pass

Optionen definieren

Um tatsächlich mit Dataflow arbeiten zu können, müssen verschiedene im SDK angegebene Optionen festgelegt werden. Das Folgende ist eine Liste typischer.

StandardOptions

name type description
streaming boolean Wählen Sie den Streaming-Modus oder den Batch-Modus

SetupOptions

name type description
setup_file string setup.Geben Sie den Pfad von py an

GoogleCouldOptions

name type description
region string Geben Sie die zu verwendende Region an
project string Geben Sie die zu verwendende Projekt-ID an
job_name string Geben Sie den Namen an, als der Job ausgeführt wurde(Beliebiger Wert)
template_location string Geben Sie den Pfad von GCP an, um die Vorlage zu speichern

Diese Optionen müssen zur Laufzeit im Code oder in Befehlszeilenargumenten angegeben werden. Wenn durch Code angegeben, ist es wie folgt.

options_sample.py


def option_setting(options: PipelineOptions) -> PipelineOptions:
    cloud_options = options.view_as(GoogleCloudOptions)
    cloud_options.region = "asia-northeast1"
    cloud_options.project = "your project id"
    
    setup_options = options.view_as(SetupOptions)
    setup_options.setup_file = "specify your setup.py path"
    return options

def setup(args):
    runner = "DirectRunner" 
    options = option_setting(PipelineOptions(args))
    return beam.Pipeline(runner, options=options)

Grundsätzlich verhält es sich wie "Optionen", die Sie mit "PipelineOptions.view_as ()" festlegen möchten. Jetzt müssen Sie nur noch den Wert für die Eigenschaft festlegen, die Sie angeben möchten.

Sie können auch eigene benutzerdefinierte Optionen erstellen, wenn Sie zur Laufzeit über die erforderlichen Einstellungen verfügen. Die Implementierung erbt einfach "PipelineOptions" und überschreibt die erforderlichen Methoden.

costom_options_sample.py


class CostomOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--hoge',
            type=int,
            default=0,
            help='This is Costom Value'
        )

Definieren Sie die Pipeline von BigQuery zu BigQuery

Definieren wir eine Pipeline, die tatsächlich Daten aus BigQuery liest und in BigQuery speichert. Als einfaches Beispiel implementieren wir den Prozess, bei dem nur die ID aus der Benutzertabelle extrahiert und in eine andere Tabelle eingefügt wird.

pipeline.py


def b2b_pipline(pipe: PCollection):

    #Beschreiben Sie die ausgeführte SQL
    query = "SELECT id, name, age FROM sample.users"
    
    _ = (pipe
        | "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
        | "Preprocess" >> beam.Map(lambda data: data["id"])
        | "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
                table="user_ids",
                schema="id:INTEGER",
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
            ) 
        )

Bei der Pipline-Verarbeitung werden drei Operationen ausgeführt: Eingabe, Zwischenoperation und Ausgabe. Es gibt viele andere Typen als die diesmal eingeführten, sodass Sie sie unter Bezugnahme auf die offizielle Referenz anpassen können.

Über die Umgebung, die tatsächlich funktioniert

Verschieben wir die implementierte Pipeline auf Local und GCP.

Beim Ausführen von Apache Beam stehen mehrere Umgebungen zur Auswahl.

Anwendungsfall Runner template_location
Auf lokal ausführen DirectRunner None
Mit Datenfluss ausführen DataflowRunner None
Als Vorlage in Dataflow ausführen DataflowRunner Geben Sie den GCS-Pfad zum Speichern der Vorlage an

Durch Speichern der Vorlage können Sie die Pipeline in GCS speichern und über die Konsole oder die Befehlszeile starten. Dies ist sehr nützlich, wenn Sie Pipeline pünktlich ausführen möchten.

Am Ende

Dieses Mal habe ich vorgestellt, wie ApcheBeam in Python implementiert und in Dataflow ausgeführt wird. Ich hoffe es wird dir hilfreich sein.

Recommended Posts

Verarbeiten Sie Big Data mit Dataflow (ApacheBeam) + Python3
Verarbeiten Sie Pubmed .xml-Daten mit Python
Verarbeiten Sie Pubmed .xml-Daten mit Python [Teil 2]
Datenanalyse mit Python 2
Datenanalyse mit Python
Verarbeiten Sie CSV-Daten mit Python (Zählverarbeitung mit Pandas)
Verarbeiten Sie Feedly-XML mit Python.
Lesen von JSON-Daten mit Python
[Python] Mit DataReader Wirtschaftsdaten abrufen
Führen Sie XGBoost mit Cloud Dataflow (Python) aus.
Python-Datenstruktur mit Chemoinfomatik gelernt
Visualisieren Sie Ihre Daten ganz einfach mit Python Seaborn.
Datenanalyse beginnend mit Python (Datenvisualisierung 1)
Datenanalyse beginnend mit Python (Datenvisualisierung 2)
Anwendung von Python: Datenbereinigung Teil 2: Datenbereinigung mit DataFrame
Holen Sie sich mit Python zusätzliche Daten zu LDAP
Datenpipeline-Aufbau mit Python und Luigi
Empfangen Sie Textdaten von MySQL mit Python
[Hinweis] Mit Python Daten von PostgreSQL abrufen
Holen Sie sich Lebensmitteldaten mit Amazon API (Python)
Versuchen Sie, mit Binärdaten in Python zu arbeiten
Generieren Sie japanische Testdaten mit Python faker
Konvertieren Sie Excel-Daten mit Python in JSON
Laden Sie japanische Aktienkursdaten mit Python herunter
Bearbeiten von DynamoDB-Daten mit Lambda (Node & Python)
Verarbeiten Sie mehrere Listen mit for in Python
Datenanalyse beginnend mit Python (Datenvorverarbeitung - maschinelles Lernen)
Organisieren Sie mit Python nach Ordnern getrennte Daten
FizzBuzz in Python3
Scraping mit Python
Erstellen Sie solche Testdaten mit Python (Teil 1)
Statistik mit Python
Lesen Sie Daten mit python / netCDF> nc.variables [] / Überprüfen Sie die Datengröße
Scraping mit Python
Lesen Sie Python-CSV-Daten mit Pandas ⇒ Graph mit Matplotlib
Python mit Go
Datenanalyse Python
Lesen Sie Tabellendaten in einer PDF-Datei mit Python
In Python integrieren
Holen Sie sich Aktienkursdaten mit Quandl API [Python]
Ich habe versucht, CloudWatch-Daten mit Python abzurufen
AES256 mit Python
Getestet mit Python
Eine Geschichte über den Umgang mit Binärdaten in Python
Python beginnt mit ()
Folium: Visualisieren Sie Daten auf einer Karte mit Python
mit Syntax (Python)
Bingo mit Python
Schreiben Sie CSV-Daten mit AWS-Lambda + Python in AWS-S3
Zundokokiyoshi mit Python
Ich habe mit der maschinellen Vorverarbeitung von Python Data begonnen
Excel mit Python
[Python] Daten lesen
Mikrocomputer mit Python
Extrahieren Sie mit Python Daten von einer Webseite
Mit Python besetzen
[Python] Strukturiertes Array erstellen (heterogene Daten mit NumPy speichern)