[PYTHON] Apache Flink 1.9.0: Integrated Alibaba Blink functionality

** Apache Flink 1.9.0 ** is an important update that integrates many of Alibaba **'s Blink features, including batch recovery of batch jobs and a Blink-based query engine.

August 22, 2019, [Apache Flink version 1.9.0](https://flink.apache.org/news/2019/08/22/release-1.9.0.html?spm=a2c65.11461447.0.0.2162645 aQwyb7q) has been officially released. This new version is [Alibaba Blink](https://medium.com/@alitech_2017/alibaba-blink-real-time-computing-for-big-time-gains-707fdd583c26?spm=a2c65.11461447.0.0.2162645aQwyb7q This is the first release after the internal version of) was integrated and integrated into the official version of Apache, Flink.

As such, there are some important changes in this all-new update. A notable feature of this new release is Batch Style Recovery for batch jobs (https://hub.packtpub.com/apache-flink-1-9-0-releases-with-fine-grained- batch-recovery-state-processor-api-and-more /? Spm = a2c65.11461447.0.0.2162645aQwyb7q) and a new Blink-based query engine for table APIs and SQL queries (https://www.i) -programmer.info/news/197-data-mining/13043-apache-flink-19-adds-new-query-engine.html?spm=a2c65.11461447.0.0.2162645aQwyb7q) preview. At the same time, this release also provides one of the most requested features, the State Processor API, which gives Flink DataSet jobs the flexibility to read and write savepoints. This release also includes a rebuilt WebUI, a preview of Flink's new Python Table API, and integration with the Apache Hive ecosystem.

From the beginning, the goal of the Apache Flink project was to develop a stream processing system that could integrate and run many forms of real-time and offline data processing applications in addition to various event-driven applications. With this release, Apache and Alibaba Cloud have taken a major step towards their first goal by integrating Blink's stream and batch processing capabilities under a unified runtime.

I hope this article will be helpful to anyone interested in this update and wanting to know what is expected. This article describes the major new features, improvements, and important changes to Apache Flink in this new update. We'll also look at Apache's future development plans.

Note: The Flink 1.9 binary distribution and source artifacts have been updated in the Documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.9/?spm=a2c65.11461447.0.0.2162645). Now available with aQwyb7q) from the Flink project's Download Page (https://flink.apache.org/downloads.html?spm=a2c65.11461447.0.0.2162645aQwyb7q). Flink 1.9 is API compatible with previous 1.x releases for APIs annotated with the @Public annotation. Flink mailing list and JIRA You can share your ideas with the community through /FLINK/summary?spm=a2c65.11461447.0.0.2162645aQwyb7q).

New features and improvements

Miniaturized batch job collection (FLIP-1)

With this new release, the time to recover a batch from a task failure, whether it's a dataset, table API, or SQL job, has been significantly reduced.

Until Flink 1.9, batch job task failures were recovered through the complex process of having to cancel all tasks and restart the entire job. That is, the job had to start from scratch, invalidating all progress.

However, this version of Flink uses this data to keep intermediate results at the edge of the network shuffle and recover only the tasks affected by the failure. In connection with this process, you can have a failover area that is a set of tasks connected through a pipelined data exchange. A batch shuffle connection for a job defines the boundaries of its failover area. For more information on all this, see [FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures?spm= See a2c65.11461447.0.0.2162645aQwyb7q).

image.png

To use this new failover strategy, make sure there is a jobmanager.execution.failover-strategy: region entry in flink-conf.yaml.

Note: The 1.9 distribution package configuration has this entry by default, but if you want to reuse the configuration file from the previous configuration, you will need to add it manually.

The "Region" failover strategy described above speeds up and improves the recovery of streaming jobs, that is, jobs without shuffles such as keyBy () and rebalance. When such a job is recovered, only the tasks in the affected pipeline (failover area) will be restarted. For all other streaming jobs, the recovery behavior is the same as in previous versions of Flink.

State Processor API (FLIP-43)

Up to Flink 1.9, you can access the job state from the outside (still experimental) Queryable State /state/queryable_state.html?spm=a2c65.11461447.0.0.2162645aQwyb7q) was limited. This release introduces a powerful new library for reading, writing, and modifying state snapshots using the DataSet API. In reality, this means that:

--You can bootstrap the state of a linked job by reading data from an external system, such as an external database, and converting it to a savepoint.

--The state in the savepoint can be queried using any of Flink's batch APIs (DataSet, Table, SQL). For example, you can analyze related state patterns and check for state mismatches for auditing and troubleshooting.

--Schemas in a savepoint state can be migrated offline compared to traditional approaches that require online migration for schema access.

--You can identify and correct invalid data in savepoints.

The new state processor API covers all variations of snapshots (savepoints, full checkpoints, incremental checkpoints). For more information, see FLIP-43 please.

Stop at Savepoint (FLIP-34)

"Cancel-with-savepoint" , A common operation for stopping and restarting Flink jobs, forking and updating. However, the existing implementation of this had the problem that the data persistence of the data transferred to the external storage system could not be fully guaranteed. To improve end-to-end semantics when stopping a job, Flink 1.9 introduces a new SUSPEND mode that stops a job at a savepoint while ensuring the consistency of the output data. You can suspend the job in the Flink CLI client as follows:

bin/flink stop -p [:targetSavepointDirectory] :jobId

The status of the last job on success is set to FINISHED, so you can easily see if the operation failed. See FLIP-34 for more information.

Flink Web UI refactoring

[Discussion] about Flink's Web UI (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Change-underlying-Frontend-Architecture-for-Flink-Web-Dashboard-td24902 After .html? spm = a2c65.11461447.0.0.2162645aQwyb7q) (see link here), the community decided to refactor this component with the latest stable version, Angular (which happens to be Angular 7.0 or later). did. This redesigned update is the default for 1.9.0. However, Flink 1.9.0 does include a link to switch to the older Web UI.

image.png

image.png

Note: It should be noted that given these major changes, it is not guaranteed that older versions of WebUI will have the same functionality as newer versions in the future. Older versions of the Web UI will be deprecated once the new version is stable.

Preview of the new Blink SQL Query Processor

With Alibaba's Blink integrated into Apache Flink, the community has been working on integrating Blink's query optimizer, table API, and SQL query functionality runtime into Flink. The first step in this task was to refactor the monolithic flink-table module into a smaller module (FLIP-32. -32% 3A + Restructure + flink-table + for + future + contributions? Spm = a2c65.11461447.0.0.2162645aQwyb7q)). For Java and Scala API modules, optimizers, and runtime modules, this means well-layered and well-defined interfaces.

image.png

The next step was to extend the Blink planner to implement a new optimizer interface. A version of the Flink processor prior to 1.9 and a new Blink-based query processor. The Blink-based query processor provides better SQL coverage, and 1.9 fully supports TPC-H. Support for TPC-DS is planned for the next release.

It also improves batch query performance based on cost-based plan selection and broader query optimization with more optimization rules. Finally, code generation and coordinated operator implementations have also been improved. The Blink-based query processor enables more powerful streaming, along with long-awaited new features such as dimension table joins, TopN, deduplication, optimizations for resolving data skews in aggregate scenarios, and more convenient built-in functions. I am doing it.

Note: The semantics of the two query processors and the set of supported operations are mostly matched, but not exactly the same. See the release notes for more information.

However, Blink's query processor integration is not yet complete. Processors with versions prior to Flink 1.9 are still the default processor in Flink 1.9 and are recommended for production environments.

You can enable the Blink processor by setting the Environment Settings when creating the TableEnvironment. The selected processor must be in the classpath of the running Java process. In a cluster configuration, by default both query processors are automatically loaded into the classpath. When querying from the IDE, explicitly Add Planner Dependencies to your project (https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/? spm = a2c65.11461447.0.0.2162645aQwyb7q # table-program-dependencies).

Other improvements to the table API and SQL

In addition to the exciting developments of the Blink planner, the community has also worked to improve these interfaces.

--Scala-free table API and SQL for Java users (FLIP-32 future + contributions? Spm = a2c65.11461447.0.0.2162645aQwyb7q)).

As part of the refactoring and splitting of the flink-table module, two independent API modules for Java and Scala have been created. For anyone using Java, this isn't a big change. However, Java users can now use the table API and SQL without using Scala dependencies.

--Table API and SQL Type System Refactoring (FLIP-37 See System? Spm = a2c65.11461447.0.0.2162645aQwyb7q))

In the community, Flink's [Type Information](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html?spm=a2c65.11461447.0.0.2162645aQwyb7q#flinks-typeinformation- class) New Data Type System to separate Table API from class and improve compliance with SQL standards /table/types.html?spm=a2c65.11461447.0.0.2162645aQwyb7q#data-types) has been implemented. This work is still in progress and will be completed in the next release. In Flink 1.9, UDFs have not yet been ported to new type systems.

--Multi-column / multi-row conversion of table API (see FLIP-29)

The functionality of the Table API has been enhanced by a series of transformations that support multi-row and multi-column inputs and outputs. These transformations make it much easier to implement processing logic, which is cumbersome to implement using relational operators.

--New unified catalog API

The community has refactored and replaced some existing catalog APIs to provide a unified way to handle internal and external catalogs. This work was primarily initiated towards integration with Hive. In addition, this rework has improved the overall convenience of managing catalog metadata in Flink.

--DDL support in SQL API (see [FLINK-10232](see https://issues.apache.org/jira/browse/FLINK-10232?spm=a2c65.11461447.0.0.2162645aQwyb7q))

Prior to version 1.9, Flink SQL only supported DML statements such as SELECT and INSERT. External tables, especially table sources and sinks, had to be registered using Java / Scala code or configuration files. In Flink 1.9, the community has added support for SQL DDL statements for registering and dropping tables, especially the CREATE TABLE and DROP TABLE commands. However, the community did not add stream-specific syntax extensions to define timestamp extraction and watermark generation. Full support for stream scenarios is planned for the next release.

Hive integration preview (FLINK-10556)

Apache Hive is for storing large amounts of structured data and executing queries Hadoop Widely used in the ecosystem. In addition to being a query processor, Hive is a Metastore for managing and organizing large datasets. It features a catalog called .0.2162645aQwyb7q). A common integration point for query processors is to integrate with Hive Metastore to make Hive available for data management.

Recently, the community has begun implementing an external catalog of SQL queries that connect to the Flink Table API and Hive Metastore. Flink 1.9 allows you to query and process various data formats stored in Hive. In addition, integration with Hive allows you to use Hive UDF with the Flink Table API and SQL queries. For more information, see FLINK-10556.

Previously, tables defined in the Table API and SQL were always temporary. The new Catalog Connector also allows you to persist tables created with SQL DDL statements to Metastore. That is, you can connect to Metastore and register a table, for example a table similar to the Kafka topic. From now on, every time a catalog is connected to Metastore, you can query its table.

Note: Please note that Hive support in Flink 1.9 is experimental. The community plans to stabilize these features in the next release and welcomes your feedback.

Preview of the new Python table API (FLIP-38 )

This release introduces the first version of the Python Table API (see FLIP-38). This is a start towards the community's goal of bringing full-fledged Python support to Flink. This feature is designed as a slim Python API wrapper for the Table API, which basically translates Python Table API method calls into Java Table API calls. In Flink 1.9, the Python Table API does not currently support UDFs, only enabling standard relational operations. UDF support in Python is on the roadmap for future releases.

If you want to try the new Python API, install PyFlink manually (https://ci.apache.org/projects/flink/flink-docs-release-1.9/flinkDev/building.html?spm=a2c65.11461447.0 .0.2162645aQwyb7q # build-pyflink) must be done. Look at the Walkthrough (https://ci.apache.org/projects/flink/flink-docs-release-1.9/tutorials/python_table_api.html?spm=a2c65.11461447.0.0.2162645aQwyb7q) in the documentation and see yourself You can start exploring with. Community Currently provides a Python package for pyflink that can be installed through pip.

Significant changes

--Table API and SQL are now part of the default settings for the Flink distribution. Previously, the table API and SQL had to move the corresponding JAR file from ./opt to ./lib to enable it.

--The machine learning library (flink-ml) is now available in FLIP-39 I did.

--Removed the old DataSet and DataStream Python APIs and [FLIP-38](https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API?spm=a2c65. We recommend using the new Python API introduced in 11461447.0.0.2162645aQwyb7q).

--Flink can be compiled and run in Java 9, but some components that interact with external systems, such as connectors, filesystems, and reporters, have their corresponding projects skipping Java 9 support. It may not work because it may not work.

Release notes

If you plan to upgrade from an existing version of Flink, please see the Release Notes (https://ci.apache.org/projects/flink/flink-docs-release) for a detailed overview of the changes and new features. -1.9 / release-notes / flink-1.9.html? spm = a2c65.11461447.0.0.2162645aQwyb7q).

Recommended Posts

Apache Flink 1.9.0: Integrated Alibaba Blink functionality
Integrate Apache and Tomcat
A quick guide to PyFlink that combines Apache Flink and Python
Apache Flink 1.9.0: Integrated Alibaba Blink functionality