Building an ML pipeline using Google Cloud Dataflow (1)

Introduction

I will describe how to build an ML pipeline on GCP.

The sample code for this article can be found in the following repositories. https://github.com/tonouchi510/dataflow-sample

Required GCP products

In the case of a service that uses machine learning, you may want to make predictions and re-learning by machine learning and save the results each time new data is added. Here, if you want to perform predictive processing that requires pre-processing and post-processing that cannot be completed only by input / output to the machine learning model, Cloud ML Engine alone is not enough, and it is necessary to combine Dataflow and so on. This time, we will create a pipeline for the entire prediction process of machine learning using dataflow. However, since the amount will be large, divide it into two parts.

Constitution

By combining GCP products, you can build a pipeline as shown in the figure below. This time, we aim to create a flow called "Pub / Sub"-> "Dataflow (pre-processing)"-> "ML Engine"-> "Dataflow (post-processing)"-> "GCS".

image.png

Build a pipeline for the following flow.

  1. Upload new data to GCS (this time)
  2. Notification goes to Cloud Pub / Sub (this time)
  3. Deployed dataflow jobs pull Pub / Sub topics at regular intervals (this time)
  4. Preprocessing with dataflow pipeline
  5. Request to ML Engine
  6. Post-processing with dataflow pipeline
  7. Saving the results to GCS (this time)

Pipeline construction

Now, let's write the construction procedure.

1. GCS bucket preparation

Prepare a bucket for uploading input data [1]. Here, create a bucket called dataflow-sample.

URL of the created bucket: gs: // dataflow-sample

2. Pub / Sub settings

Create a Pub / Sub topic to receive event notifications [2]. You can create it from the navigation bar of the GCP console, so create a topic with the name gcs-notify, for example. This time, the subscriber is not used, so you do not have to set it.

3. Pub / Sub Notifications for GCS settings

Set "Cloud Pub / Sub Notifications for Cloud Storage" in the bucket you created earlier so that you will be notified of events when new data is uploaded [3]. At the time of writing this article, you can receive the following types of events, but this time we want to trigger only the creation of a new file, so specify the option to set only OBJECT_FINALIZE.

image.png

Also, since we want to save the results in the same bucket, specify the folder for event monitoring. The command including the above settings is as follows.

$ gsutil notification create -t [TOPIC_NAME] -f json -p [folder] -e OBJECT_FINALIZE gs://[BUCKET_NAME]

//This time
$ gsutil notification create -t gcs-notify -f json -p data/ -e OBJECT_FINALIZE gs://dataflow-sample

To check the settings, type the following command.

$ gsutil notification list gs://[BUCKET_NAME]

Now, when the file is uploaded to the configured bucket, that information will be sent to Pub / Sub and added to the gcs-notify topic in the form of a message.

4. Create dataflow template

I want to execute a dataflow job triggered by the occurrence of a Pub / Sub event, but I think there are various ways to do this.

--How to set a server (GAE, etc.) that handles the execution of dataflow jobs as a Pub / Sub subscriber and keep it running --The job runs triggered by the push notification from Pub / Sub. --How to create a job from a streaming dataflow template and pull and execute Pub / Sub topics at regular intervals --Jobs are always running

This time, we will take the method of starting the streaming processing job from the dataflow template.

About dataflow [4]

Pipeline implementation is done using apache beam that is being developed by Google, and dataflow can be specified on GCP as its Runner. The apache beam itself improves the concept of MapReduce and is easier to write, and can be executed with various runners. The dataflow itself is also scalable and has excellent distributed processing performance, so it would be very convenient if you even learned how to use it.

python SDK At the time of writing, python is only compatible with version 2.7, and input from Pub / Sub did not work, and there were many problems, so it is not recommended to do it with python.

I also implemented it in python at first, but for beginners of distributed processing, the learning cost of dataflow itself is high, and I encounter unsolved issues many times, so I gave up and switched to stable Java. Then, I was able to develop it with almost no problems.

Java SDK In the case of java, I think it is relatively stable and has a lot of documentation. The official document uses maven, but since it is difficult to use, I will build it with gradle References.

Just add the following to the dependencies of build.gradle.

compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.5.0'

Pipeline implementation

In this article, we will pull a message from the Pub / Sub topic every 30 seconds, extract the path of the newly created image file from the contents of the message, crop it, and save it in GCS. Please refer to the document (references [5] [6]) for the explanation of apache beam itself.

The code looks like this:

PubsubToText.java



public class PubsubToText {

    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();

        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        dataflowOptions.setRunner(DataflowRunner.class);
        dataflowOptions.setProject("dataflow-sample");
        dataflowOptions.setStagingLocation("gs://dataflow-sample-bucket/staging");
        dataflowOptions.setTemplateLocation("gs://dataflow-sample-bucket/templates/MyTemplate");
        dataflowOptions.setStreaming(true);
        dataflowOptions.setNumWorkers(1);

        run(dataflowOptions);
    }

    public static PipelineResult run(DataflowPipelineOptions options) {
        String topic = "projects/dataflow-sample/topics/gcs-notify";
        String output = "gs://dataflow-sample-result/output.txt";

        Pipeline p = Pipeline.create(options);

        /*
         * Steps:
         *   1) Read string messages from PubSub
         *   2) Window the messages into minute intervals specified by the executor.
         *   3) Output the windowed files to GCS
         */
        p.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromTopic(topic))
                .apply( "30s Window",
                        Window.into(FixedWindows.of(Duration.standardSeconds(60))))
                .apply("Load Image", ParDo.of(new LoadImageFn()))
                .apply("Write File(s)", TextIO.write()
                        .withWindowedWrites()
                        .withNumShards(1)
                        .to(output));

        return p.run();
    }
}

The important point is to specify the option with dataflowOptions.setStreaming (true) for streaming processing, and to set the window of the time interval when implementing the pipeline.

The messages notified by Cloud Pub / Sub Notifications for Cloud Storage are in the following format. I am writing a process to extract necessary information from here with my own method, read the image from the acquired path on GCS, crop it, and save it under the result directory.

image.png

LoadImageFn.java



public class LoadImageFn extends DoFn<PubsubMessage, String> {

    @ProcessElement
    public void processElement(@Element PubsubMessage m, OutputReceiver<String> out) {
        Map<String, String> attr = m.getAttributeMap();

        Storage storage = StorageOptions.getDefaultInstance().getService();
        BlobId blob = BlobId.of(attr.get("bucketId"), attr.get("objectId"));
        byte[] content = storage.readAllBytes(blob);
        InputStream is = new ByteArrayInputStream(content);

        BufferedImage img = null;
        try {
            img = ImageIO.read(is);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //Coordinates to start cutting
        int X = 50;
        int Y = 50;
        //Cut size
        int W = 100;
        int H = 100;

        BufferedImage subimg;  //Cutout image storage class
        try {
            assert img != null;
            subimg = img.getSubimage(X, Y, W, H);
        }
        catch ( RasterFormatException re ) {
            System.out.println( "The specified range is outside the range of the image" );
            return;
        }

        BlobId blobId = BlobId.of(attr.get("bucketId"), "result/cropped_image.jpg ");
        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("image/jpeg").build();

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        BufferedOutputStream os = new BufferedOutputStream( bos );

        try {
            ImageIO.write(subimg, "jpeg", os);
        } catch (IOException e) {
            e.printStackTrace();
        }

        storage.create(blobInfo, bos.toByteArray());

        out.output(String.valueOf(img.getHeight()));
    }

}

By the way, even for videos, you can use GCS client and JavaCV (I think there are many situations where you want to include ffmpeg processing in dataflow),

Now when you run the program locally, the template file will be created in the GCS bucket specified by dataflowOptions.setTemplateLocation ("gs: //dataflow-sample-bucket/templates/MyTemplate ").

$ gradle run

5. Create a dataflow job from a template

procedure

--Go to the GCP console and select Dataflow from the navigation menu --Select "Create Job from Template" --Select a custom template as the template type and enter the GCS path of the dataflow template file you just created. --Enter the job name to execute the job --Complete when the pipeline is built correctly and looks like the figure below

image.png

Check if the job runs when you upload a new file to GCS. Since the topic is pulled every 30 seconds in the streaming process, you need to wait for a while.

When the job execution is completed, it is OK if the cropped image is created in GCS. Since it is a dataflow job for streaming processing, it will not close even if one job is completed, and the job will run if you upload the file again. Please check the operation.

Summary

After the new file was uploaded to GCS, PubSub was notified, and the Dataflow job ran and saved the results in GCS.

I wrote the process of reading the image from the path written in the message from "Cloud Pub / Sub Notifications for Cloud Storage" and saving it in GCS in the dataflow pipeline, so the rest is preprocessing, prediction, postprocessing If you add the pipeline to do, the pipeline of the whole ML will be completed. Since it is necessary to use ML Engine etc., I will write about these constructions at another time.

References

[1] https://cloud.google.com/storage/docs/ [2] https://cloud.google.com/pubsub/docs/ [3] https://cloud.google.com/storage/docs/pubsub-notifications [4] https://cloud.google.com/dataflow/docs/ [5] https://beam.apache.org/documentation/ [6] https://beam.apache.org/documentation/programming-guide/

Recommended Posts

Building an ML pipeline using Google Cloud Dataflow (1)
Building a CICD pipeline using Docker (personal memorandum)
I tried using Google Cloud Vision API in Java