[PYTHON] Cloud Dataflow Super Primer

Dies ist ein praktisches Dokument für GCPUG Beginners Tokyo # 3.


Diese Folie ist auf Qiita


bit.ly/dataflow-ho


Was ist Google Cloud Dataflow?

-Einfache Datenverarbeitung ――Die Arbeitsbelastung des Arbeitnehmers wird ohne Erlaubnis ausgeglichen ――Weil es mit GCE funktioniert, können Sie alles tun --Datalab (Jupyter) Sie können von oben bereitstellen!

** Einfach gesagt, eine super coole Version von MapReduce **


Tutorial-Repository

git clone https://github.com/hayatoy/dataflow-tutorial.git


Vorbereitungen

That's it!


Das Folgende ist die Code-Erklärung


Apache Beam importieren

import apache_beam as beam

Grundeinstellungen des Datenflusses

Geben Sie den Jobnamen, den Projektnamen und den Speicherort für temporäre Dateien an.

options = beam.utils.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
    beam.utils.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'dataflow-tutorial1'
gcloud_options.project = 'PROJECTID'
gcloud_options.staging_location = 'gs://PROJECTID/staging'
gcloud_options.temp_location = 'gs://PROJECTID/temp'

Einstellungen der Datenflussskala

Stellen Sie die maximale Anzahl von Arbeitern, Maschinentyp usw. ein. Die Festplattengröße des Workers ist ** groß, standardmäßig 250 GB (Batch) und 420 GB (Streaming) **. Daher wird empfohlen, hier die erforderliche Größe anzugeben.

worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 20
worker_options.max_num_workers = 2
# worker_options.num_workers = 2
# worker_options.machine_type = 'n1-standard-8'
# worker_options.zone = 'asia-northeast1-a'

Ausführungsumgebung wechseln

options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
# options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'

Bereit, Pipeline-Beispiel unten







Pipeline 1

Lesen Sie einfach die Datei aus GCS und schreiben Sie ihren Inhalt in GCS

+----------------+
|                |
| Read GCS File  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Write GCS File |
|                |
+----------------+
p1 = beam.Pipeline(options=options)

(p1 | 'read' >> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
    | 'write' >> beam.io.WriteToText('gs://PROJECTID/test.txt', num_shards=1)
 )

p1.run().wait_until_finish()

Pipeline Teil 2

Lesen Sie einfach die Daten von BigQuery und schreiben Sie den Inhalt in GCS Der BigQuery-Datensatz befindet sich unten https://bigquery.cloud.google.com/table/bigquery-public-data:samples.shakespeare

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Write GCS File |
|                |
+----------------+
p2 = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(p2 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'write' >> beam.io.WriteToText('gs://PROJECTID/test2.txt', num_shards=1)
 )

p2.run().wait_until_finish()

Pipeline Teil 3

Lesen Sie Daten aus BigQuery und schreiben Sie Daten in BigQuery

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Write BigQuery |
|                |
+----------------+
p3 = beam.Pipeline(options=options)

#Hinweis: Erstellen Sie ein Dataset
query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(p3 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable1',
        schema='corpus_date:INTEGER, corpus:STRING, word:STRING, word_count:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p3.run().wait_until_finish()

Pipeline Teil 4

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        v
+-------+--------+
|                |
| Modify Element |
|                |
+----------------+
        |
        v
+-------+--------+
|                |
| Write BigQuery |
|                |
+----------------+
def modify_data1(element):
    # beam.Map wird verwendet, um eine Zeile für eine Eingabezeile auszugeben
    # element = {u'corpus_date': 0, u'corpus': u'sonnets', u'word': u'LVII', u'word_count': 1}

    corpus_upper = element['corpus'].upper()
    word_len = len(element['word'])

    return {'corpus_upper': corpus_upper,
            'word_len': word_len
            }


p4 = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(p4 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'modify' >> beam.Map(modify_data1)
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable2',
        schema='corpus_upper:STRING, word_len:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p4.run().wait_until_finish()

Pipeline Teil 5

Beispiel für das Teilen eines Zweigs

+----------------+
|                |
| Read BigQuery  |
|                |
+-------+--------+
        |
        +---------------------+
        |                     |
+-------v--------+    +-------v--------+
|                |    |                |
| Modify Element |    | Modify Element |
|                |    |                |
+-------+--------+    +-------+--------+
        |                     |
        +---------------------+
        |
+-------v--------+
|                |
| Flatten        |
|                |
+-------+--------+
        |
        |
+-------v--------+
|                |
| Save BigQuery  |
|                |
+----------------+
def modify1(element):
    # element = {u'corpus_date': 0, u'corpus': u'sonnets', u'word': u'LVII', u'word_count': 1}
    word_count = len(element['corpus'])
    count_type = 'corpus only'

    return {'word_count': word_count,
            'count_type': count_type
            }


def modify2(element):
    # element = {u'corpus_date': 0, u'corpus': u'sonnets', u'word': u'LVII', u'word_count': 1}
    word_count = len(element['word'])
    count_type = 'word only'

    return {'word_count': word_count,
            'count_type': count_type
            }


p5 = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
query_results = p5 | 'read' >> beam.io.Read(beam.io.BigQuerySource(
    project='PROJECTID', use_standard_sql=False, query=query))

#Übergeben Sie die BigQuery-Ergebnisse an zwei Zweige
branch1 = query_results | 'modify1' >> beam.Map(modify1)
branch2 = query_results | 'modify2' >> beam.Map(modify2)

#Reduzieren Sie die Ergebnisse aus dem Zweig
((branch1, branch2) | beam.Flatten()
                    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
                        'testdataset.testtable3',
                        schema='word_count:INTEGER, count_type:STRING',
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p5.run().wait_until_finish()

Pipeline 6

Verwenden Sie ** Gruppieren nach **

def modify_data2(kvpair):
    #groupby übergibt ein Tupel von Schlüsseln und eine Liste von Daten mit diesen Schlüsseln
    # kvpair = (u'word only', [4, 4, 6, 6, 7, 7, 7, 7, 8, 9])

    return {'count_type': kvpair[0],
            'sum': sum(kvpair[1])
            }


p6 = beam.Pipeline(options=options)

query = 'SELECT * FROM [PROJECTID:testdataset.testtable3] LIMIT 20'
(p6 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | 'pair' >> beam.Map(lambda x: (x['count_type'], x['word_count']))
    | "groupby" >> beam.GroupByKey()
    | 'modify' >> beam.Map(modify_data2)
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable4',
        schema='count_type:STRING, sum:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p6.run().wait_until_finish()

Pipeline 7

Trennen Sie den Abschnitt ** Gruppieren nach ** mit ** Fenster **

def assign_timevalue(v):
    #Fügen Sie den Sammlungsdaten einen Zeitstempel hinzu
    #Das letztere Fenster wird basierend auf diesem Zeitstempel unterteilt
    #Hier werden Zeitstempel mit Zufallszahlen eingefügt.
    import apache_beam.transforms.window as window
    import random
    import time
    return window.TimestampedValue(v, int(time.time()) + random.randint(0, 1))


def modify_data3(kvpair):
    #groupby übergibt ein Tupel von Schlüsseln und eine Liste von Daten mit diesen Schlüsseln
    #Da es durch Fenster unterteilt ist, ist die Anzahl der Daten gering
    # kvpair = (u'word only', [4, 4, 6, 6, 7])

    return {'count_type': kvpair[0],
            'sum': sum(kvpair[1])
            }


p7 = beam.Pipeline(options=options)

query = 'SELECT * FROM [PROJECTID:testdataset.testtable3] LIMIT 20'
(p7 | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', use_standard_sql=False, query=query))
    | "assign tv" >> beam.Map(assign_timevalue)
    | 'window' >> beam.WindowInto(beam.window.FixedWindows(1))
    | 'pair' >> beam.Map(lambda x: (x['count_type'], x['word_count']))
    | "groupby" >> beam.GroupByKey()
    | 'modify' >> beam.Map(modify_data3)
    | 'write' >> beam.io.Write(beam.io.BigQuerySink(
        'testdataset.testtable5',
        schema='count_type:STRING, sum:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
 )

p7.run().wait_until_finish()

Recommended Posts

Cloud Dataflow Super Primer
Was ist Google Cloud Dataflow?
Dataflow Primer Wordcount und früher
Führen Sie XGBoost mit Cloud Dataflow (Python) aus.
Führen Sie Cloud Dataflow (Python) über AppEngine aus