Bei Verwendung von Dataflow in Python wird das offizielle Beispiel Wortanzahlを読み解くためのメモ。
Wird nacheinander hinzugefügt.
import apache_beam as beam
import os
def first_pipeline():
with beam.Pipeline('DirectRunner') as pipeline:
#Lesen Sie die Datei,"input_data"Variable zuweisen
input_data = pipeline | 'ReadMyFile' >> beam.io.ReadFromText('./data/sample.txt')
# input_Schreiben Sie den Inhalt der Daten in eine Datei
input_data | beam.io.WriteToText('./data/output.txt')
if __name__ == '__main__':
#Ändern Sie das Arbeitsverzeichnis in den Speicherort der ausführbaren Datei
os.chdir(os.path.dirname(os.path.abspath(__file__)))
first_pipeline()
ParDo
Geben Sie nacheinander weitere Prozesse an.
import apache_beam as beam
import os
def run_pipeline():
with beam.Pipeline('DirectRunner') as pipeline:
# [Final Output PCollection] = ([Initial Input PCollection]
# | [First Transform]
# | [Second Transform]
# | [Third Transform])
input_data = (pipeline
| 'read_file' >> beam.io.ReadFromText('./data/sample.txt')
| 'add_hoge_string' >> beam.ParDo(lambda line: line + "hoge")
)
# input_Schreiben Sie den Inhalt der Daten in eine Datei
input_data | beam.io.WriteToText('./data/output.txt')
if __name__ == '__main__':
#Ändern Sie das Arbeitsverzeichnis in den Speicherort der ausführbaren Datei
os.chdir(os.path.dirname(os.path.abspath(__file__)))
run_pipeline()
Inhalt von sample.txt
Hello World
foo
bar
Ausgabe
H
e
l
l
o
W
o
r
l
d
h
o
g
e
f
o
o
h
o
g
e
b
a
r
h
o
g
e
Hoge wird am Ende jeder Zeile hinzugefügt, aber aus irgendeinem Grund wird auch ein Zeilenumbruch hinzugefügt