[PYTHON] Dataflow memo ParDo edition

Former story

https://beam.apache.org/documentation/transforms/python/elementwise/pardo/

What is Dataflow?

[Have a look at this.

What is ParDo

ParDo is a general-purpose processing function for Tranform in parallel.

ParDo applies arbitrary processing implemented by the user to each element in the PCollection that is a block of data when it is input, and converts the processing result into the output PCollection.

The usage is like this.

ParDo1.py


class SplitWords(beam.DoFn):

    def process(self, element):
        return [element.split()]

with beam.Pipeline() as p:
    len = (
        p
        | 'Create Data' >> beam.Create(['Cloud Dataflow is a distributed parallel processing service.',
                                       'BigQuery is a powerful Data WareHouse.',
                                       'Cloud Pub/Sub is a scalable messaging service.'])
        | 'Split into words' >> beam.ParDo(SplitWords())
        | 'Print' >> beam.Map(print)
        )

The process to be done by ParDo is implemented in the class that inherits beam.DoFn. Write the actual process in the process method. Since ʻelement` taken as an argument becomes each record of PCollection, split () is called for that string here. Finally, return returns the processing result as an array.

yield element.split()

May be returned with

The output looks like this.

['Cloud', 'Dataflow', 'is', 'a', 'distributed', 'parallel', 'processing', 'service.']
['BigQuery', 'is', 'a', 'powerful', 'Data', 'WareHouse.']
['Cloud', 'Pub/Sub', 'is', 'a', 'scalable', 'messaging', 'service.']

ParDo methods

DoFn.setup() DoFn Called once per instance when the instance is initialized. It can be called more than once per worker. It is good to do the connection processing of the database and network here.

DoFn.start_bundle() Called once for chunks of elements. Called before process is called on the first ʻelement`. Good for tracking the start of processing of chunks of elements.

DoFn.process(element, *args, **kwargs) Called for each ʻelement. Produces 0 or more ʻelement. You can take * args and ** kwargs as arguments through the ParDo arguments.

DoFn.finish_bundle() Called once for chunks of elements. Called after process is called on the last ʻelement. Produces 0 or more ʻelement. A good place to run a batch at the end of a chunk, like running a database query.

Image of how to use start_bundle and finish_bundle

For example, initialize a batch with start_bundle, add an element to the batch instead of return or yield with process, and finally run a query with finish_bundle to print the result. How to use.

DoFn.teardown() DoFn Called once for each instance when it terminates. (However, it seems to be best effort. It means that if a worker crashes, it will not be executed.) Note that. Ideal for closing database and network connections.

Use Timestamp and Window information

Information such as the time that element has (called event_time) and window start time and end time can be obtained as follows.

class AnalyzeElement(beam.DoFn):
  def process(self, elem, 
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam):
    yield '\n'.join(['# timestamp',
                     'type(timestamp) -> ' + repr(type(timestamp)),
                     'timestamp.micros -> ' + repr(timestamp.micros),
                     'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
                     'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
                     '',
                     '# window',
                     'type(window) -> ' + repr(type(window)),
                     'window.start -> {} ({})'.format(
                         window.start, window.start.to_utc_datetime()),
                     'window.end -> {} ({})'.format(
                         window.end, window.end.to_utc_datetime()),
                     'window.max_timestamp() -> {} ({})'.format(
                         window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
    ])

Recommended Posts

Dataflow memo ParDo edition