Development of Flink using DataStream API

In this article, we will review the basics of distributed stream processing and explore the development of Flink and DataStream ** API ** as an example.

Basic concept of stream processing

The definition of stream processing may be different. Conceptually, stream processing and batch processing are two sides of the same coin. Their relationship depends on whether the ArrayList, Java elements are considered to be directly confined datasets and are accessed by subscripts or iterators.

image.png

Figure 1. The left side is a coin classifier

A coin classifier can be described as a stream processing system. In advance, all the components used to classify coins are connected in series. Coins continuously enter the system and are output to another queue for future use. The same applies to the photo on the right.

Stream processing systems have many characteristics. Stream processing systems typically employ a data-driven processing scheme to support the processing of infinite datasets. Set the operator in advance and process the data. To represent complex computational logic, distributed stream processing engines, including Flink, typically use DAG graphs to represent the entire computational logic.

Each point on the DAG represents the operator, which is the basic logical unit. Organize your computational logic into directed graphs to allow data to flow from the edges into your system from special source nodes. Data is transmitted and processed between operators via different data transmission methods such as network transmission and local transmission. Finally, the results of the data are sent to external systems and databases via other specialized sink nodes.

image.png

Figure 2. Logic graph of DAG computing and the actual runtime physical model.

Each operator on the logical graph has multiple simultaneous threads on the physical graph. For distributed stream processing engines, the actual run-time physical model is more complicated because each operator can have multiple instances. As shown in Figure 2, the source operator A has two instances, and the intermediate operator C also has two instances.

In the logical model, A and B are upstream nodes of C, and in the corresponding physical model, there can be data exchanges between all instances of C, A, and B.

When you distribute operator instances across different processes, you carry data over the network. Data transfer between multiple instances in the same process usually does not need to go through the network.

Table 1 DAG calculation graph constructed using Apache Storm. The API definition of Apache Storm is "operation oriented", so it is low level.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

Table 2 DAG calculation graph constructed using Apache Flink. The Apache Flink API definition is more "data oriented", so it's a higher level.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile ("input");
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");

Since DAG graphs represent the computational logic of stream processing, most APIs are designed around the construction of this computational logic graph. Table 1 shows an example of Apache Storm's WordCount, which was popular a few years ago.

Apache Storm adds Spout and Bolt operators to graphs to specify how to connect between them. In this way, after building the entire graph, submit it for execution in a remote or local cluster.

In contrast, the Apache Flink API also builds computational logic graphs, but Flink's API definitions are more data processing logic oriented. Flink abstracts the data stream into an infinite set, defines a group of operations on that set, and automatically builds the corresponding DAG graph at the bottom layer.

As a result, the Flink API is at a higher level. Many researchers may prefer Storm's high flexibility for experimentation because it makes it easier to secure the expected graph structure. However, the industry as a whole prefers advanced APIs like the Flink API because they are easier to use.

Overview of Flink DataStream API

Based on the basic idea of stream processing so far, we will explain in detail how to use the Flink DataStream API. Let's start with a simple example. Table 3 is an example of streaming WordCount. It has only 5 lines of code, but it provides the basic structure for developing programs based on the Flink DataStream API.

Table 3 WordCount examples based on the Flink DataStream API

// 1. Set the runtime environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. Configure the data source to read data
DataStream<String> text = env.readTextFile ("input");
// 3. Perform a series of transformations
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
// 4. Configure the Sink to write data out
counts.writeAsText("output");
// 5. Submit for execution
env.execute("Streaming WordCount");

To implement Streaming WordCount, first get the StreamExecutionEnvironment object. This is the context object that builds the graph. Add operators based on this object. For stream processing levels, create a data source to access the data. This example uses a built-in data source to read a file located in an Environment object.

Then get the DataStream object, which is an infinite dataset. Perform a series of operations on this dataset. For example, in the WordCount example, each record (that is, one line in the file) is first separated into words and implemented by the FlatMap operation.

Calling FlatMap adds the operator to the underlying DAG graph. Then, to get a stream of words, the words in the stream are grouped (KeyBy) and the data for each word is cumulatively calculated (sum (1)). The calculated word data forms a new stream and is written to the output file.

Finally, call the env # execute method to start executing the program. Make sure that none of the methods you called earlier are processing the data, and build a DAG graph to represent the computational logic.

Build the entire graph and explicitly call the Execute method first. The framework provides computational graphs to the cluster to access data and execute logic.

Streaming WordCount-based examples show that compiling a stream processor based on the Flink DataStream API generally requires three steps: accessing, processing, and writing data.

Finally, call the Execute method.

image.png

Figure 3. Overview of Flink Data Stream operation.

As you can see from the previous example, the core of the Flink DataStream API is a DataStream object that represents streaming data. The entire computational logic graph is built on the basis of invoking different operations on the DataStream object to create a new DataStream object.

In general, there are four types of operations on Data Streams. The first type is a single record operation, which filters unwanted records (Filter operation) and converts each record (Map operation). The second type is a multi-record operation. For example, to count the total volume of orders within an hour, add all order records within an hour. To support this type of operation, you need to combine the required records through a window for processing.

The third type is to manipulate multiple streams and convert them into a single stream. For example, you can merge multiple streams with operations such as Union, Join, and Connect. These operations use different logic to merge the streams, but eventually produce a new unified stream, which allows for some cross-stream operations.

The fourth type is a "split operation", which is supported by DataStream and is in contrast to the Merge operation. These operations split the stream into multiple streams according to the rules, and each split stream is a subset of the previous stream.

image.png

Figure 4. Different types of DataStream subtypes. Different subtypes support different sets of operations.

To support different stream operations, Flink introduces different sets of stream types to indicate the type of intermediate stream dataset. Figure 4 shows the complete type of transformation relationship.

For single record operations like Map, the result is of type DataStream. The Split operation produces a SplitStream. Based on the SplitStream, use the Select method to filter the desired records to get the base stream.

Similarly, the Connect operation gets a dedicated ConnectedStream after calling StreamA.connect (StreamB). The operations supported by ConnectedStream are different from the operations supported by a common DataStream.

This is the result of merging two different streams, allowing you to specify different processing logic for the records in the two streams, and the processed results form a new DataStream stream. Process different records with the same operator to share state information during processing. Some upper-tier Join operations need to be implemented through lower-tier Connect operations.

You can also divide the stream by time or number by operating the window. Select a specific split logic. When all the records in the group arrive, get all the records and perform traverse and sum operations. Therefore, by processing each group you get a set of output data and all the output data form a new base stream.

For a common DataStream, use the allWindow operation, which represents a unified Window operation for the entire stream. Therefore, you cannot use multiple operator instances for simultaneous calculations. To resolve this issue, first group the records by Key using the KeyBy method. After that, separate Window processes are executed in parallel for the records corresponding to different keys.

The KeyBy operation is one of the most important and commonly used operations. It is explained in detail below.

image.png

Figure 5. Comparison of window operation of basic stream and KeyedStream

Window operations on KeyedStream enable simultaneous processing using multiple instances. Figure 5 shows a comparison of allWindow operations on a basic DataStream object and Window operations on a KeyedStream object. To process data in multiple concurrent instances at the same time, use the KeyBy operation to group the data.

Both the KeyBy and Window operations group the data, but the KeyBy operation splits the stream horizontally and the Window operation splits the stream vertically.

After splitting the data with KeyBy, each subsequent operator instance can process the data corresponding to a particular Key set. Flink also allows operators to maintain a particular state. The state of operators on KeyedStream is stored in a distributed manner.

KeyBy is a deterministic data allocation method (the next section will introduce other allocation methods). If you restart a failed job and the parallelism changes, Flink reassigns the Key group, ensuring that the group that handles a particular Key must contain the state of that Key, and is consistent. Ensure sex.

Finally, note that the KeyBy operation only works if the number of Keys exceeds the number of concurrent instances of the operator. All data corresponding to the same Key is sent to the same instance, so if the number of Keys is less than the number of instances, some instances will not be able to receive the data and the computing power will not be fully utilized. Become.

Other issues

Flink supports physical grouping methods other than KeyBy when exchanging data between operators. As shown in Figure 1, the physical grouping methods in Flink Data Stream include:

--Global: The upstream operator sends all records to the first instance of the downstream operator. --Broadcast: The upstream operator sends each record to all instances of the downstream operator. --Forward: The upstream operator sends the record to all instances of the downstream operator. Each instance of the upstream operator sends a record to the corresponding instance of the downstream operator. This method applies only if the number of instances of the upstream operator is the same as the number of instances of the downstream operator.

--Shuffle: The upstream operator randomly selects the downstream operator for each record. --Rebalance: Upstream operators send data on a round robin basis. --Rescale: If the number of instances of the upstream and downstream operators is'n'and'm', respectively, and'n'<'m', then each upstream instance is ceil (m / n) on a round robin basis. Or send data to an instance downstream of floor (m / n). If n'>' m', the upstream instance of floor (n / m) or ceil (n / m) sends data to the downstream instance on a round robin basis. -PartitionCustomer: If the built-in allocation method does not suit your needs, choose to customize the grouping method.

image.png

Figure 6. Physical grouping methods other than KeyBy

In addition to grouping methods, another important concept in the Flink DataStream API is the system type.

As shown in Figure 7, the Flink DataStream object has a strongly set system type. You must specify the element type for each DataStream object. Flink's underlying serialization mechanism relies on this information to optimize serialization. Specifically, the bottom layer of Flink uses a TypeInformation object to describe the type. The TypeInformation object defines a string of type-related information used by the serialization framework.

image.png

Figure 7. Type system for the Flink DataStream API

Flink has some commonly used built-in base types. For these, Flink also provides its type information and can be used directly without any additional declarations. Flink can use a type inference mechanism to identify the corresponding type. However, there are exceptions.

For example, the Flink DataStream API supports both Java and Scala. Many Scala APIs pass type information through implicit parameters, so if you need to call the Scala API through Java, you need to pass type information through implicit parameters. Another example is Java's generic erasure. If the stream type is a generic type, it may not be necessary to infer the type of information after erasure. In this case, the type of information must also be explicitly specified.

In Flink, Java API generally uses Tuple type when joining multiple fields, but Scala API often uses Row type and Case Class type. Compared to the Row type, the tuple type has two restrictions: the number of fields cannot exceed 25, and null values cannot be used in all fields.

Finally, Flink allows you to customize new types, TypeInformation, and serialize them with Kryo. However, this can cause migration issues. Therefore, we recommend that you avoid custom types.

Example

Let's look at a slightly more complicated example. Suppose you have a data source in your system that monitors your orders. It uses Tuple2 to output the type of goods ordered and the volume of transactions when placing a new order. It then counts the trading volume of all types of items in real time.

Table 4 An example of real-time order statistics.

public class GroupedProcessingTimeWindowSample {
    private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            Random random = new Random();
            while (isRunning) {
                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
                String key = "Classification" + (char) ('A' + random.nextInt(3));
                int value = random.nextInt(10) + 1;

                System.out.println(String.format("Emits\t(%s, %d)", key, value));
                ctx.collect(new Tuple2<>(key, value));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);

        keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return "";
            }
        }).fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.put(value.f0, value.f1);
                return accumulator;
            }
        }).addSink(new SinkFunction<HashMap<String, Integer>>() {
            @Override
            public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
                  //Typological commodities
                  System.out.println(value);
                  //Commodity exchange volume
                  System.out.println(value.values().stream().mapToInt(v -> v).sum());
            }
        });

        env.execute();
    }
}

Table 4 shows the implementation of this example. Here, we will implement a simulated data source that inherits from RichParallelSourceFunction. RichParallelSourceFunction is an API for SourceFunction with multiple instances.

Implement two methods, the Run method and the Cancel method. Flink calls the Run method directly into the source at run time. The data must be output continuously to form the initial stream. When implementing the Run method, randomly generate records for item type and transaction volume and send them using the ctx # collect method. If you need to cancel the source task used by the Volatile variable for Flink to mark and control its execution state, use the Cancel method.

Then start building the graph with the Main method. First, create a StreamExecutionEnviroment object. The getExecutionEnvironment method, which is called to create the object, automatically determines the environment so that the appropriate object is created. For example, right-clicking in the IDE and executing the method creates a LocalStreamExecutionEnvironment object.

When run in a real environment, it creates a RemoteStreamExecutionEnvironment object. Create a source to get the initial stream based on the Environment object. Then, in order to count the transaction amount for each item type, we use KeyBy to group the input stream through the first field (item type) of Tuple and the second field (transaction amount) of the record corresponding to each Key. ) Is summed up.

At the bottom layer, the Sum operator uses the State method to hold the total value of the transaction volumes for each Key (item type). When a new record arrives, the Sum operator updates the total volume maintained and prints the .NET record.

If you only want to count volumes of type, the program ends here. Add the Sink operator immediately after the Sum operator to print a continuously updated transaction volume for each item type. However, to count the transaction volume of all types, output all records of the same compute node.

I'm using KeyBy to return the same Key for all records, group them, and send all records to the same instance.

Then use the Fold method to maintain the volume for each item type in the operator. Note that the Fold method is marked Deprecated, but today it cannot be replaced by other operations in the DataStream API. Therefore, this method receives an initial value.

Then, when each record in the subsequent stream arrives, the operator calls the FoldFunction passed in to update the initial value and sends the updated value.

Use HashMap to keep track of the current transaction volume for each item type. Update the HashMap when new ones arrive. In this way, it receives the latest item type and transaction volume HashMap through Sink, and outputs the total transaction volume and transaction volume of each item based on this value.

This example demonstrates how to use the DataStream API. You can write more efficiently. The top Tables and SQLs also support a retract mechanism that handles this situation better.

image.png

Figure 8 Schematic diagram of the API.

Finally, let's take a look at the principles of the DataStream API. When you call the DataStream # map algorithm, Flink creates a Transformation object at the bottom layer. This object represents a node in the Computational Logic Graph. It records a user-defined function (UDF), a Map Function.

Create more DataStream objects by using more methods. Each object has a Transformation object, which forms a graph structure based on computational dependencies.

This is the calculation graph. Flink then further transforms the graph structure to finally generate the Job Graph needed to submit the job.

Overview

This article introduces the Flink DataStream API, a lower level API of Flink. In actual development, you have to use some concepts by yourself based on API such as State and Time, which is troublesome. Subsequent courses will also introduce higher-level Tables and SQL APIs. In the future, Table and SQL may become the mainstream of Flink's API.

However, lower-level APIs create more powerful expressiveness. The DataStream API may be required for fine-grained operations.

Recommended Posts

Development of Flink using DataStream API
[Rails 6] API development using GraphQL (Query)
MOD development notes using Minecraft 14.4 Fabric API # 1
Build an environment of "API development + API verification using Swagger UI" with Docker
Example of using vue.config.js
Development of factor level
Summary of using FragmentArgs
DSL development using ANTLR 4.7.1
Summary of using DBFlow
Example of params using where
Summary of using Butter Knife
Example of using abstract class
Data processing using Apache Flink
Rate-Limiting using RateLimiter of Resilience4j
Sample of using Salesforce's Bulk API from Java client with PK-chunking
Summary of Docker understanding by beginners ③ ~ Until proxying API using nginx ~