https://beam.apache.org/documentation/transforms/python/elementwise/pardo/
[Schau dir das an.
ParDo ist eine universelle Verarbeitungsfunktion für Tranform parallel.
Wenn eine PCollection, bei der es sich um einen Datenblock handelt, eingegeben wird, wendet ParDo eine vom Benutzer implementierte willkürliche Verarbeitung auf jedes Element an und konvertiert das Verarbeitungsergebnis in eine Ausgabe-PCollection.
Die Verwendung ist wie folgt.
ParDo1.py
class SplitWords(beam.DoFn):
def process(self, element):
return [element.split()]
with beam.Pipeline() as p:
len = (
p
| 'Create Data' >> beam.Create(['Cloud Dataflow is a distributed parallel processing service.',
'BigQuery is a powerful Data WareHouse.',
'Cloud Pub/Sub is a scalable messaging service.'])
| 'Split into words' >> beam.ParDo(SplitWords())
| 'Print' >> beam.Map(print)
)
Der von ParDo auszuführende Prozess ist in der Klasse implementiert, die "beam.DoFn "erbt. Schreiben Sie den eigentlichen Prozess in die Methode "process". Da das als Argument verwendete Element zu jedem Datensatz von PCollection wird, wird hier split () für diese Zeichenfolge aufgerufen. Schließlich gibt return das Verarbeitungsergebnis als Array zurück.
yield element.split()
Kann mit zurückgegeben werden
Die Ausgabe sieht so aus.
['Cloud', 'Dataflow', 'is', 'a', 'distributed', 'parallel', 'processing', 'service.']
['BigQuery', 'is', 'a', 'powerful', 'Data', 'WareHouse.']
['Cloud', 'Pub/Sub', 'is', 'a', 'scalable', 'messaging', 'service.']
DoFn.setup()
DoFn
Wird einmal pro Instanz aufgerufen, wenn die Instanz initialisiert wird. Es kann mehr als einmal pro Arbeiter aufgerufen werden. Hier ist es gut, die Verbindungsverarbeitung der Datenbank und des Netzwerks durchzuführen.
DoFn.start_bundle() Wird einmal für Teile von Elementen aufgerufen. Wird aufgerufen, bevor "process" für das erste "Element" aufgerufen wird. Gut geeignet, um den Beginn der Verarbeitung von Elementblöcken zu verfolgen.
DoFn.process(element, *args, **kwargs)
Wird für jedes "Element" aufgerufen. Erzeugt 0 oder mehr Elemente
s. Sie können * args
und ** kwargs
als Argumente über die ParDo-Argumente verwenden.
DoFn.finish_bundle()
Wird einmal für Teile von Elementen aufgerufen. Wird aufgerufen, nachdem "process" für das letzte "Element" aufgerufen wurde. Erzeugt 0 oder mehr Elemente
s. Ein guter Ort, um einen Stapel am Ende eines Blocks auszuführen, z. B. eine Datenbankabfrage.
Initialisieren Sie beispielsweise einen Stapel mit "start_bundle", fügen Sie dem Stapel ein Element hinzu, anstatt mit "process" zurückzukehren oder nachzugeben, und führen Sie schließlich eine Abfrage mit "finish_bundle" aus, um das Ergebnis auszugeben. Wie benutzt man.
DoFn.teardown() DoFn Wird einmal pro Instanz aufgerufen, wenn die Instanz beendet wird. (Dies scheint jedoch die beste Anstrengung zu sein. Dies bedeutet, dass es nicht ausgeführt wird, wenn der Worker abstürzt.) Beachten Sie dies. Ideal zum Schließen von Datenbank- und Netzwerkverbindungen.
Informationen wie die Zeit (event_time genannt) des Elements sowie die Start- und Endzeit des Fensters können wie folgt abgerufen werden.
class AnalyzeElement(beam.DoFn):
def process(self, elem,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):
yield '\n'.join(['# timestamp',
'type(timestamp) -> ' + repr(type(timestamp)),
'timestamp.micros -> ' + repr(timestamp.micros),
'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
'',
'# window',
'type(window) -> ' + repr(type(window)),
'window.start -> {} ({})'.format(
window.start, window.start.to_utc_datetime()),
'window.end -> {} ({})'.format(
window.end, window.end.to_utc_datetime()),
'window.max_timestamp() -> {} ({})'.format(
window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
])
Recommended Posts