Apache Flink Python API: history, architecture, development environment, key operators

This article introduces the history of the ** Apache Flink Python API ** and describes its architecture, development environment, and key operators.

Apache Flink Python API History, Current Status and Future Developments

Why Apache Flink supports Python

Apache Flink is an open source big data computing engine with unified stream and batch data processing capabilities. Apache Flink 1.9.0 provides a Machine Learning (ML) API and a new Python API. Now let's take a closer look at why Apache Flink supports Python.

--Python is one of the most popular development languages

image.png

According to RedMonk statistics, Python is the third most popular development language after Java and JavaScript. RedMonk is an industry analyst company focused on software developers. Apache Flink is a big data computing engine with stream and batch data processing capabilities. What is the relationship between the topic Python and Apache Flink? With this question in mind, let's take a look at the now-famous big data-related open source components. For example, the early batch processing framework Hadoop, the stream computing platform STORM, the recently popular Spark, the data warehouse Hive, and the KV storage-based HBase are well-known for their Python API support. It's an open source project.

--Python is supported by many open source projects.

image.png

Given Python's complete ecosystem, Apache Flink has invested heavily in version 1.9 to launch a whole new PyFlink. As big data, artificial intelligence (AI) is closely related to Python.

--Python is backed by machine learning (ML).

image.png

According to statistics, it matches 0.129% of job listings in the ML industry, making Python the most sought after language. Compared to 0.076% of the R language, we can see that Python is preferred in the ML industry. Python, an interpretive language, has a design philosophy of "there is only one way to do things". Due to its simplicity and ease of use, Python, one of the most popular languages in the world, has become a good ecosystem in the field of big data computing. It also has promising potential in the field of ML. So the other day, Apache Flink 1.9 announced the Python API, which adopted a completely new architecture.

Apache Flink is a computing engine with unified stream and batch data processing capabilities. The community attaches great importance to Flink users and wants to provide more access and channels to Flink like Java and Scala. This will make Flink more convenient for more users and will benefit from the value provided by Flink's big data computing capabilities. Starting with Apache Flink 1.9, the Apache Flink community launches a Python API with a brand new technical architecture that supports the most commonly used operators such as JOIN, AGG, and WINDOW.

Python API - RoadMap image.png

In Apache Flink 1.9, Python can take advantage of user-defined Java functions, but it does not support the definition of Python-native user-defined functions. Therefore, Apache Flink 1.10 supports Python user-defined functions and Python's data analysis library Pandas. Apache Flink 1.11 also adds support for the Data Stream and ML APIs.

Apache Flink Python API architecture and development environment

Python table API architecture

image.png

The new Python API architecture consists of a user API module, a communication module between a Python virtual machine (VM) and a Java VM, and a module that puts tasks into and operates a Flink cluster.

How do the Python VM and Java VM communicate? The Python VM has a Python gateway that maintains a connection with the Java VM that has a GateWayServer that receives calls from the Python VM.

Apache Flink versions prior to 1.9 already support the Python API in the DataSet and DataStream modules. However, they each use two different APIs. DataSet API and DataStream API. A unified architecture is crucial for a stream computing engine like Flink, which has unified stream and batch data processing capabilities. The existing Python DataSet and DataStream APIs use JPython's technical architecture. However, JPython cannot properly support the Python 3.X series. As a result, the existing Python API architecture has been abandoned and Flink 1.9 has adopted a completely new technology architecture. This new Python API is based on the Table API.

Communication between the Table API and the Python API is implemented in the communication between the Python VM and the Java VM. The Python API communicates with the Java API, which writes and calls. Working with the Python API is similar to working with the Java Table API. The new architecture has the following advantages:

--You don't have to create new operators, instead you can easily maintain consistency with the functionality of the Java Table API. --Optimize the Python API using the Java Table API optimization model. This ensures that jobs written using the Python API will provide optimal performance.

image.png

When the Python VM initiates a request for a Java object, the Java VM creates the object, stores it in a storage structure, and assigns an ID to the object. It then sends that ID to the Python VM, which manipulates the object with the corresponding object ID. The Python VM can work with all objects in the Java VM, ensuring that the Python Table API has the same functionality as the Java Table API and can leverage existing performance optimization models.

image.png

In the new architecture and communication model, the Python VM gets the corresponding Java object ID and calls the Java Table API simply by passing the name and parameters of the calling method to the Java VM. Therefore, developing the Python Table API follows the same steps as developing the Java Table API. Next, let's explore how to develop a simple Python API job.

Python Table API-Job Development

image.png

A Python table job is generally divided into four steps. Considering the current situation, first decide whether to run the job in batch mode or streaming mode. Users of later versions can skip this step, but users of Apache Flink 1.9 must make this decision.

Once you've decided on a job execution mode, you know where the data comes from and how you define the data source, schema, and data type. Then write the calculation logic (the calculation operation performed on the data) and persist the final calculation result to the specified system. Next, define the sink. Just as you define a data source, you define the sink's schema and all the field types in it.

Next, let's understand how to code each of the above steps using the Python API. First, create an execution environment, which should ultimately be a table environment. This table environment must have a Table Config module with some configuration parameters passed to the RunTime layer during the execution process. This module must also provide some custom settings that can be used during the actual service development phase.

image.png

After creating the execution environment, you need to define the data source table. As an example, the data records in the CSV file are separated by commas (,) and the fields are listed in the field column. This table contains only one field, Word, which is of type String.

image.png

After defining and describing the data source and converting the structure of the data source into a table, what kind of data structure and data type will be in the Table API layer? Next, let's see how to add fields and field types using with_SCHEMA. Here, there is only one field and the data type is String. The data source is registered as a table in the catalog for subsequent queries and calculations.

image.png

Then create a result table. When the calculation is finished, save the calculation result to a persistent system. For example, to write a WordCount job, you first have a storage table with two fields, word and count. Then register this table as a sink.

image.png

After registering the table sink, let's see how to write the calculation logic. In fact, writing a WordCount in the Python API is as easy as writing it in the Table API. Unlike DataStream, the Python API requires only one line of statement to write a WordCount job. For example, first scan the source table and use a GROUP BY statement to group the rows by word. Then use the SELECT statement to select the words and use the aggregate function to calculate the count for each word. Finally, insert the calculation result into the result table.

Python Table API-Development environment

image.png

The fatal question is exactly how to run a WordCount job. First, set up the development environment. Different versions of the software may be installed on different machines. Here are some of the requirements for a software version.

image.png

Second, build a binary Java release package based on the source code. So clone the code in the master branch to get the 1.9 branch. Of course, you can use the master code. However, the master code is not stable, so we recommend using the 1.9 branch code. Let's proceed with the procedure. First, compile the code. For example:

//Download source code
git clone https://github.com/apache/flink.git
//Abduction 1.9 minutes
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
//Binary system development package
mvn clean install -DskipTests -Dfast

After compiling, place the release package in the corresponding directory.

cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0
tar -zcvf flink-1.9.0.tar.gz flink-1.9.0

After building the Java API, validate the API and build the Python release package.

image.png

All Python users know that in order to install packages through pip install, they must either integrate their dependent libraries with their local Python environment or install these dependent libraries in their local environment.

This also applies to Flink. Package and install PyFlink in a resource package recognized by Pypip. Use the following command to copy the package and install it in your environment.

cd flink-Python;Python setup.py sdist

This process simply wraps the Java release package along with some Java packages and some Python packages for PyFlink modules. Look for the new ʻapache-link-1.9.dev0.tar.gz `package in the dist directory.

cd dist/

The ʻapache-flink-1.9.dev0.tar.gz` file in the dist directory is a PyFlink package that you can install with pip install. The Apache Flink 1.9 installation package includes both Flink Table and Flink Table Blink. Flink supports two planners at the same time. You can freely switch between the default Flink planner and the Blink planner. We encourage you to try each one yourself. After packaging, try installing it in our environment.

image.png

Use a very simple command to first check if the command is correct. Before running the command, use pip to check the list to see if the package is already installed. Then try installing the package you prepared in the previous step. In a real-world scenario, install a new package to install the upgrade.

pip install dist/*.tar.gz
pip list|grep flink

image.png

After installing the package, use the WordCount job I wrote earlier to check if your environment is correct. To verify that your environment is correct, clone the environment code repository directly by running the following command:

git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code; Python word_count.py

Next, let's try it. Find the previously created wordCount job file in this directory. Let's use python word_count.py directly to check if there is a problem in the environment. The Apache Flink Python API should launch a mini-cluster to run WordCount jobs. Now, the job is already running on the mini-cluster.

In this process, the code first reads the source file and writes the result to a CSV file. In this directory, find the sink.csv file. For detailed step-by-step instructions, see the video titled "The Status Quo and Planning of Apache Flink Python API" posted in Apache Flink Community China.

image.png

Now let's talk about setting up an integrated development environment (IDE). We recommend using PyCharm for developing Python-related logic and jobs.

For more information on setting up the IDE, scan the QR code or go directly to the blog (https://enjoyment.cool). Please give me. I think there are many Python environments, but you need to choose the one you used for your pip installation. This is very important. For step-by-step instructions, see the video titled "Apache Flink Python API Current Status and Plans."

Python Table API-Submit Job

image.png

What are some ways to submit a job? First, use the CLI method to submit the job to an existing cluster. You must start the cluster to use this method. The build directory is usually under build-target. Run this command directly to start the cluster. Note that this process uses an external web port. Set the port number in the flink-conf.yaml file. Then start the cluster using the commands in the PPT. To verify that the cluster started successfully, check the logs or visit the site in your browser. If the cluster starts successfully, let's see how to submit a job.

image.png

Use Flink run to run the following code to submit a job.

./bin/flink run -py  ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py

Use py to specify a Python file, pym to specify a Python module, pyfs to specify a Python resource file, and j to specify a JAR package.

image.png

With Apache Flink 1.9, there is a more convenient way. The Python Shell allows you to interactively write the results obtained with the Python API. The Python Shell runs in two modes, local and remote, but there is no big difference. First, try the local mode by running the following command.

bin/pyflink-shell.sh local

This command starts a mini-cluster. Running the code returns a Flink logo with the text FLINK --PYTHON --SHELL and some sample scripts demonstrating this feature. Entering these scripts will return the correct output and results. Here you can write either streaming or batch. Watch the video for detailed operating instructions.

Now you have a basic understanding of the architecture of the Python Table API in Apache Flink 1.9 and how to configure the Python Table API. To see how to run a job in the IDE and submit a job using Flink run and Python Shell, I've considered a simple WordCount example. I also experienced some interactive ways to take advantage of Flink's Python API. After introducing the Flink preferences and a simple example demo, we'll discuss the key operators in Apache Flink 1.9.

Introduction and application of key operators of Flink Python API

Python table API operator

image.png

We've already seen how to create a job. First, select the run mode: Stream or Batch. Next, define the tables (source and result tables), schema, and data types to use. Then write the calculation logic. Finally, we use the Python API's built-in aggregate functions Count, Sum, Max, Min. For example, when I wrote a WordCount job, I used the Count function.

Apache Flink 1.9 meets most of your usual needs. Now, apart from what we've seen so far, let's take a look at the Flink Table API operators supported by Apache Flink 1.9. The Flink Table API operators (Python Table API operators and Java Table API operators) support operations such as:

First, single-stream operations such as SELECT, FILTER, aggregate operations, window operations, and column operations (ʻadd_columns, drop_columns`).

Second, dual stream operations such as JOIN, MINUS, and UNION.

All of these operators are supported by the Python Table API. In Apache Flink 1.9, the Python Table API is functionally similar to the Java Table API. Next, let's understand how to write the above operators and how to develop Python operators.

Python Table API Operators-Watermark Definition

image.png

As you may have noticed in this article, we haven't touched on time series, which is an attribute of data streams. The objective state of the data stream is that it may be out of order. Apache Flink uses Watermark's mechanism to process out-of-order data streams.

How to define Watermark in Python API?

Suppose you have a JSON-formatted data file that contains two fields, a and DateTime. To define the watermark, you need to add a rowtime column when creating the Schema and set the rowtime data type to Timestamp.

Define watermarks in various ways. Use watermarks_periodic_bounded to send watermarks on a regular basis. The number 60000 refers to 60000ms, which corresponds to 60 seconds or 1 minute. This definition allows the program to process out-of-order data streams within a one-minute period. Therefore, the higher the value, the more resistant to out-of-order data and the longer the latency. For more information on how watermarking works, see this blog http://1t.click/7dM.

Python Table API-Java UDF

image.png

Finally, I will introduce the application of Java User Defined Functions (UDF) in Apache Flink 1.9. Apache Flink 1.9 does not support Python UDFs, but you can use Java UDFs with Python. Apache Flink 1.9 optimizes and rebuilds the Table module. To develop a Java UDF, import a simple dependency and develop the Python API. Import Flink-table-common.

image.png

Next, let's look at how to develop a Python API using Java UDFs. Suppose you need to develop a UDF that calculates the length of a string. You need to register the Java function with Python using t_env.register_java_function, passing the name and full path of the Java function. You can then call the UDF using the registered name. For more information, see my blog http://1t.click/HQF

image.png

How to run Java UDF? Run it using Flink's run command. As mentioned earlier, I'm using -j to include the UDF JAR package.

Does Java UDF only support scalar functions? Java UDF supports not only scalar functions, but also table and aggregate functions.

image.png

Python Table API reference link

Here are some commonly used materials and links to my blog. Hopefully they will help you.

image.png

Overview

This article introduced the history and development roadmap of the Apache Flink Python API. Next, I explained why I changed the architecture of the Apache Flink Python API and the latest architecture available. It also described future plans and new features for the Apache Flink Python API. We encourage you to share your suggestions and ideas.

Recommended Posts

Apache Flink Python API: history, architecture, development environment, key operators
Python development environment construction
About Python development environment
python2.7 development environment construction
Organize your Python development environment
[ev3dev × Python] Build ev3dev development environment
[MEMO] [Development environment construction] Python
[For organizing] Python development environment
Prepare Python development environment on Ubuntu
Prepare your first Python development environment
[Python3] Development environment construction << Windows edition >>
Python development environment options for May 2020
Python development environment construction on macOS
Vim + Python development environment setting memo
Emacs settings for Python development environment
Install Python development environment on Windows 10
Emacs Python development environment construction memo
Checking the NAOqi Python development environment
Prepare Python development environment with Atom
Python (anaconda) development environment construction procedure (SpringToolsSuites) _2020.4
Python development environment for macOS using venv 2016
[Development environment] Python with Xcode [With screen transition]
Blender 2.82 or later + python development environment notes
How to prepare Python development environment [Mac]
Python3 + venv + VSCode + macOS development environment construction
Environment construction procedure: Ubuntu + Apache2 + Python + Pyramid
Build Python development environment (pythonz, virtualenv, direnv)
The strongest Python development environment PyCharm's recommendation
Building a Python development environment for AI development
Windows + gVim + Poetry python development environment construction
The strongest Python integrated development environment PyCharm