[PYTHON] Dataflow Primer Wordcount und früher

Überblick

Bei Verwendung von Dataflow in Python wird das offizielle Beispiel Wortanzahlを読み解くためのメモ。

Wird nacheinander hinzugefügt.

Datei IO

import apache_beam as beam
import os


def first_pipeline():
    with beam.Pipeline('DirectRunner') as pipeline:
        #Lesen Sie die Datei,"input_data"Variable zuweisen
        input_data = pipeline | 'ReadMyFile' >> beam.io.ReadFromText('./data/sample.txt')
        # input_Schreiben Sie den Inhalt der Daten in eine Datei
        input_data | beam.io.WriteToText('./data/output.txt')


if __name__ == '__main__':
    #Ändern Sie das Arbeitsverzeichnis in den Speicherort der ausführbaren Datei
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    first_pipeline()

ParDo

Geben Sie nacheinander weitere Prozesse an.


import apache_beam as beam
import os

def run_pipeline():
    with beam.Pipeline('DirectRunner') as pipeline:

        # [Final Output PCollection] = ([Initial Input PCollection]
        #                               | [First Transform]
        #                               | [Second Transform]
        #                               | [Third Transform])
        input_data = (pipeline
        | 'read_file' >> beam.io.ReadFromText('./data/sample.txt')
        | 'add_hoge_string' >> beam.ParDo(lambda line: line + "hoge")
        )

        # input_Schreiben Sie den Inhalt der Daten in eine Datei
        input_data | beam.io.WriteToText('./data/output.txt')


if __name__ == '__main__':
    #Ändern Sie das Arbeitsverzeichnis in den Speicherort der ausführbaren Datei
    os.chdir(os.path.dirname(os.path.abspath(__file__)))
    run_pipeline()




Inhalt von sample.txt

Hello World
foo
bar

Ausgabe

H
e
l
l
o
 
W
o
r
l
d
h
o
g
e
f
o
o
h
o
g
e
b
a
r
h
o
g
e

Hoge wird am Ende jeder Zeile hinzugefügt, aber aus irgendeinem Grund wird auch ein Zeilenumbruch hinzugefügt

Recommended Posts

Dataflow Primer Wordcount und früher
Cloud Dataflow Super Primer