An introductory article according to the title. Write code in Java instead of Python, process with streaming instead of badges, and use Dataflow as a runner instead of local.
In this article, we'll create a dataflow that receives data from a PubSub topic, does some simple processing, and then writes the data to another PubSub topic.
Create the required GCP resources.
Maven is used here.
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
Add "Hello," to the data received from PubSub and put it in another PubSub.
AddHello.java
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class AddHello {
private static final String PROJECT = "[YOUR PROJECT]";
private static final String STAGING_LOCATION = "gs://[YOUR GCS BACKET]/staging";
private static final String TEMP_LOCATION = "gs://[YOUR GCS BACKET]/temp";
private static final String SRC_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 1]";
private static final String DST_PUBSUB_TOPIC = "projects/[YOUR PROJECT]/topics/[PUBSUB TOPIC 2]";
static class MyFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output("Hello," + c.element());
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setRunner(DataflowRunner.class);
dataflowOptions.setProject(PROJECT);
dataflowOptions.setStagingLocation(STAGING_LOCATION);
dataflowOptions.setTempLocation(TEMP_LOCATION);
dataflowOptions.setNumWorkers(1);
Pipeline p = Pipeline.create(dataflowOptions);
p.apply(PubsubIO.readStrings().fromTopic(SRC_PUBSUB_TOPIC))
.apply(ParDo.of(new MyFn()))
.apply(PubsubIO.writeStrings().to(DST_PUBSUB_TOPIC));
p.run();
}
}
Set the environment variable to GOOGLE_APPLICATION_CREDENTIALS = / path / to / xxxxx.json
and execute the above code.
Select Dataflow from the GCP web console and verify that it has been deployed.
Put data in the PubSub topic that will be the data source. This is possible from the GCP web console. Dataflow may not be loaded immediately after deployment, so it may be a good idea to take some time.
Create a subscription (tentatively called my-subscription) in the PubSub topic of the data output destination and acquire the data.
$ gcloud pubsub subscriptions pull my-subscription --auto-ack
Recommended Posts