https://beam.apache.org/documentation/transforms/python/elementwise/pardo/
[Have a look at this.
ParDo is a general-purpose processing function for Tranform in parallel.
ParDo applies arbitrary processing implemented by the user to each element in the PCollection that is a block of data when it is input, and converts the processing result into the output PCollection.
The usage is like this.
ParDo1.py
class SplitWords(beam.DoFn):
def process(self, element):
return [element.split()]
with beam.Pipeline() as p:
len = (
p
| 'Create Data' >> beam.Create(['Cloud Dataflow is a distributed parallel processing service.',
'BigQuery is a powerful Data WareHouse.',
'Cloud Pub/Sub is a scalable messaging service.'])
| 'Split into words' >> beam.ParDo(SplitWords())
| 'Print' >> beam.Map(print)
)
The process to be done by ParDo is implemented in the class that inherits beam.DoFn
.
Write the actual process in the process
method.
Since ʻelement` taken as an argument becomes each record of PCollection, split () is called for that string here.
Finally, return returns the processing result as an array.
yield element.split()
May be returned with
The output looks like this.
['Cloud', 'Dataflow', 'is', 'a', 'distributed', 'parallel', 'processing', 'service.']
['BigQuery', 'is', 'a', 'powerful', 'Data', 'WareHouse.']
['Cloud', 'Pub/Sub', 'is', 'a', 'scalable', 'messaging', 'service.']
DoFn.setup()
DoFn
Called once per instance when the instance is initialized. It can be called more than once per worker. It is good to do the connection processing of the database and network here.
DoFn.start_bundle()
Called once for chunks of elements. Called before process
is called on the first ʻelement`. Good for tracking the start of processing of chunks of elements.
DoFn.process(element, *args, **kwargs)
Called for each ʻelement. Produces 0 or more ʻelement
. You can take * args
and ** kwargs
as arguments through the ParDo arguments.
DoFn.finish_bundle()
Called once for chunks of elements. Called after process
is called on the last ʻelement. Produces 0 or more ʻelement
. A good place to run a batch at the end of a chunk, like running a database query.
For example, initialize a batch with start_bundle
, add an element to the batch instead of return or yield with process
, and finally run a query with finish_bundle
to print the result. How to use.
DoFn.teardown()
DoFn
Called once for each instance when it terminates. (However, it seems to be best effort. It means that if a worker crashes, it will not be executed.) Note that. Ideal for closing database and network connections.
Information such as the time that element has (called event_time) and window start time and end time can be obtained as follows.
class AnalyzeElement(beam.DoFn):
def process(self, elem,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):
yield '\n'.join(['# timestamp',
'type(timestamp) -> ' + repr(type(timestamp)),
'timestamp.micros -> ' + repr(timestamp.micros),
'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
'',
'# window',
'type(window) -> ' + repr(type(window)),
'window.start -> {} ({})'.format(
window.start, window.start.to_utc_datetime()),
'window.end -> {} ({})'.format(
window.end, window.end.to_utc_datetime()),
'window.max_timestamp() -> {} ({})'.format(
window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
])
Recommended Posts