[PYTHON] What is Google Cloud Dataflow?

It is a package of WACUL Co., Ltd. and CTO. Everyone in the company is having a good time making bread, making curry and watching movies.

I've been playing around with Google Cloud Dataflow lately, so I've organized it as an introductory note. I hope it helps those who are about to touch it first to get a rough overview. I don't have deep expertise in streaming or batch processing, so if you're saying something wrong, I'd love to hear from you.

What you can understand by reading this article

What you don't understand after reading this article

First of all, reference materials

Reference materials related to Google Cloud Dataflow that I have read so far.

What is Google Cloud Dataflow?

Cloud Dataflow is a large-scale data processing engine and its managed service. In general, it would be nice to think of it as a companion to Hadoop, Spark, and so on. The main features are the provision of a new programming model and a fully managed execution environment.

A programming model that integrates batch processing and streaming processing

The difference between batch processing and streaming processing is whether the data handled is finite (bounded) or infinite (unbounded). Since the foundation of batch processing has a long history and is stable compared to the streaming processing infrastructure, large-scale data processing has been designed based on batch processing. On the other hand, there is a growing demand for faster business decisions and faster data delivery to users, and a growing demand for streaming processing engines that continuously process endless data.

Therefore, Lambda Architecture, which combines batch processing and streaming processing to provide the final result, has appeared. Did. However, maintaining a system built with a lambda architecture can be a daunting task. It's hard to imagine that you have to use different programming models and be consistent at the logic level. .. ..

The programming model provided by Cloud Dataflow seems to aim to integrate finite data processing and infinite data processing based on streaming processing. To put it very roughly, if you have a model that accurately processes new incoming data and an engine that can be done in a realistic time, if you play back all the past data and input it, you will get the same result. That's fine! about it.

The problem with existing streaming engines was that it was difficult to control the integrity of the data, mainly due to the difficulty of handling the concept of time. The difficulty of dealing with the concept of time is due to the fact that in reality there are many cases where the execution time when actually processing is different from the time when the event really occurred. Network and processing delays, and in extreme cases, mobile app logs are sent to the server when they come online. In order to deal with this, it is necessary to handle buffering processing and data that is different from the actual arrival order. Cloud Dataflow seems to be able to handle this well by introducing some new concepts here. (I don't understand it completely, please see the reference material https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101, https://www.oreilly.com/ideas / the-world-beyond-batch-streaming-102)

This programming model was originally dedicated to running on GCP, but became open source by Google as an Apache Beam project around May 2016 (By the way, Beam is a combination of Batch and Stream). ..

Fully managed execution engine

Google Cloud Dataflow, Apache Spark, Apache Flink, Apache Apex, etc. can be selected as the execution engine of Apache Beam that optimizes each step of the pipeline and efficiently distributes processing, which will be described later (unverified except for Cloud Dataflow). On-premises, Flink seems to be the best.

So, to be precise, Cloud Dataflow is positioned as a fully managed service, the execution engine of Apache Beam running on GCP. When you move it, a GCE instance will be launched behind the scenes. Considering the maintenance cost of infrastructure, I think it can be an attractive option especially for startups.

Java or Python

You can choose Java or Python as the implementation language for Cloud Dataflow. However, as of November 2016, the Python version may not be available in beta.

Such. It will be fulfilled from now on.

Let's take a look at the code

Let's take a look at the Java code. The sample code can be found in the github repository linked from the Google documentation. It is organized.

Java8 version, count words in input [WordCount](https://github.com/GoogleCloudPlatform/DataflowJavaSDK-examples/blob/master/src/main/java8/com/google/cloud/dataflow/examples/MinimalWordCountJava8 Let's take .java) as an example.

Excerpt only for the main part (For CHANGE, you need to enter the project ID on your GCP and the bucket name of GCS)

public static void main(String[] args) {
    DataflowPipelineOptions options = PipelineOptionsFactory.create()
        .as(DataflowPipelineOptions.class);

    options.setRunner(BlockingDataflowPipelineRunner.class);

    // CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
    options.setProject("SET_YOUR_PROJECT_ID_HERE");

    // CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
    options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {}))
     .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
         .withOutputType(new TypeDescriptor<String>() {}))

     // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run();
}

Pipeline

In the mood of the code, you can see that Dataflow is a functional style that defines the framework of processing rather than processing values. It's similar to Rx (Reactive Extension) or TensorFlow.

A pipeline is an object that controls the flow of processing, and is created by passing options to Pipeline.create. Optionally specify the settings required to run the pipeline.

Switching between batch mode and streaming mode

options.setRunner(BlockingDataflowPipelineRunner.class);

Part of

options.setRunner(DataflowPipelineRunner.class);

To In other words, both will be processed by the flow built on the Pipeline object.

PCollection, PTransform

What we are passing to Pipeline's apply is a PTransform object. Generated with FlatMapElements.via and Filter.byPredicate along with java8 lambda expressions.

PTransform is an interface that defines the process of receiving PInput and returning POutput. For this PTransform, the pipeline will do the work, passing the following values. The processing of the start point is the PInput interface, and the processing of the end point is the POutput interface, but the intermediate processing handles an instance of the PCollection class that implements both. PCollection is an object that represents the data that crosses the pipeline, and on the other hand, data conversion, branching, and joining processing are built on Pipeline.

Example:

//Divide the collection of strings in the previous row into words(Make the array flat at the end)
input.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
         .withOutputType(new TypeDescriptor<String>() {})) ...

//Filter only to non-empty collections of strings in the previous row
input.apply(Filter.byPredicate((String word) -> !word.isEmpty())) ...

//Aggregate the collection of strings in the previous row and convert it to a collection of maps of counts by value
input.apply(Count.<String>perElement()) ...

Input / output

Currently supported input / output

(The python version is a text file and BigQuery only)

I / O is also a type of PTransform, so pass it to ʻapply` for processing.

Example:

//Read a text file from Google Cloud Storage and convert it to a line-by-line collection of strings
pipeline.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) ...

//Convert stream input from Cloud Pubsub to a collection with a timestamp
pipeline.apply(PubsubIO.Read
          .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
          .subscription(options.getPubsubSubscription())) ...

//Receive a collection and output it to BigQuery
//tableRef is information such as table location, schema information of schema table
... .apply(BigQueryIO.Write.to(tableRef)
            .withSchema(schema)

Let's run

If you put your project and storage information in the sample code and execute it, you can monitor the execution status from the management screen of Cloud Dataflow.

Cloud_Dataflow_-_enoshima_and_Grafana_-_Mesos_Tasks.png

The process defined in Pipeline has been processed in the cloud. Behind the scenes, an instance of Compute Engine is launched and processed. The output of this sample is output by dividing the file into multiple files on the Cloud Storage bucket that you created.

By the way, the text file of the input of this sample is

GLOUCESTER      A poor unfortunate beggar.

EDGAR   As I stood here below, methought his eyes
        Were two full moons; he had a thousand noses,
        Horns whelk'd and waved like the enridged sea:
        It was some fiend; therefore, thou happy father,
        Think that the clearest gods, who make them honours
        Of men's impossibilities, have preserved thee.

GLOUCESTER      I do remember now: henceforth I'll bear
        Affliction till it do cry out itself
        'Enough, enough,' and die. That thing you speak of,
        I took it for a man; often 'twould say
        'The fiend, the fiend:' he led me to that place.

It's like Shakespeare's script. The output count by word is

decreased: 1
'shall': 2
War: 4
empress': 14
Provost: 99
stoops: 6

It is a file in the format like.

Impressions

I ran a simple sample to get a feel for the rough feel of Google Cloud Dataflow. What I felt was

What I don't know / What I want to try

Recommended Posts

What is Google Cloud Dataflow?
What is namespace
What is copy.copy ()
What is Django? .. ..
What is dotenv?
What is POSIX?
What is Linux
What is klass?
What is Linux?
What is python
What is hyperopt?
What is Linux
What is pyvenv
What is __call__
What is Linux
What is Python
What is a distribution?
What is Piotroski's F-Score?
What is Raspberry Pi?
Is it Google Colaboratory?
What is Calmar Ratio?
What is a terminal?
[PyTorch Tutorial ①] What is PyTorch?
What is hyperparameter tuning?
What is a hacker?
What is JSON? .. [Note]
Cloud Dataflow Super Primer
What is Linux for?
What is a pointer?
What is ensemble learning?
What is TCP / IP?
What is Python's __init__.py?
What is an iterator?
What is UNIT-V Linux?
[Python] What is virtualenv
What is machine learning?
What is Minisum or Minimax?
I checked the Python package pre-installed in Google Cloud Dataflow
What is Logistic Regression Analysis?
What is the activation function?
What is the Linux kernel?
What is an instance variable?
What is a decision tree?
What is a Context Switch?
[DL] What is weight decay?
[Python] Python and security-① What is Python?
What is a super user?
Competitive programming is what (bonus)
[Python] * args ** What is kwrgs?
What is a system call
[Definition] What is a framework?
What is the interface for ...
What is Project Euler 3 Acceleration?
What is a callback function?
What is the Callback function?
What is a python map?
What is your "Tanimoto coefficient"?
Python Basic Course (1 What is Python)
Apache Beam 2.0.x with Google Cloud Dataflow starting with IntelliJ and Gradle
What is Python? What is it used for?
[Python] What is a zip function?