Dieser Artikel stellt den Verlauf der ** Apache Flink Python-API ** vor und beschreibt deren Architektur, Entwicklungsumgebung und Schlüsseloperatoren.
Apache Flink ist eine Open-Source-Big-Data-Computing-Engine mit einheitlichen Stream- und Batch-Datenverarbeitungsfunktionen. Apache Flink 1.9.0 bietet eine ML-API (Machine Learning) und eine neue Python-API. Schauen wir uns nun genauer an, warum Apache Flink Python unterstützt.
Laut RedMonk-Statistiken ist Python nach Java und JavaScript die drittbeliebteste Entwicklungssprache. RedMonk ist ein Branchenanalystenunternehmen, das sich auf Softwareentwickler konzentriert. Apache Flink ist eine Big-Data-Computing-Engine mit Stream- und Batch-Datenverarbeitungsfunktionen. Welche Beziehung besteht zwischen dem Thema Python und Apache Flink? Schauen wir uns vor diesem Hintergrund die mittlerweile bekannten Open-Source-Komponenten für Big Data an. Beispielsweise sind das frühe Stapelverarbeitungsframework Hadoop, die Stream-Computing-Plattform STORM, der kürzlich beliebte Spark, das Data Warehouse Hive und die speicherbasierte HBV von KV für die Unterstützung der Python-API bekannt. Es ist ein Open Source Projekt.
Angesichts des vollständigen Ökosystems von Python hat Apache Flink stark in Version 1.9 investiert, um einen völlig neuen PyFlink zu starten. Künstliche Intelligenz (KI) ist als Big Data eng mit Python verwandt.
Laut Statistik entspricht es 0,129% der Stelleninformationen in der ML-Branche, was Python zur gefragtesten Sprache macht. Im Vergleich zu 0,076% der R-Sprache können wir sehen, dass Python in der ML-Branche bevorzugt wird. Python, eine interpretierende Sprache, hat die Designphilosophie "Es gibt nur einen Weg, Dinge zu tun". Aufgrund seiner Einfachheit und Benutzerfreundlichkeit hat sich Python, eine der beliebtesten Sprachen der Welt, zu einem guten Ökosystem im Bereich Big Data Computing entwickelt. Es hat auch vielversprechendes Potenzial im Bereich ML. Neulich haben wir die Python-API mit Apache Flink 1.9 angekündigt, die eine völlig neue Architektur angenommen hat.
Apache Flink ist eine Computer-Engine mit einheitlichen Stream- und Batch-Datenverarbeitungsfunktionen. Die Community legt großen Wert auf Flink-Benutzer und möchte Flink wie Java und Scala mehr Zugriff und Kanäle bieten. Dies macht Flink für mehr Benutzer bequemer und profitiert von dem Wert, den die Big-Data-Computing-Funktionen von Flink bieten. Ab Apache Flink 1.9 startet die Apache Flink-Community eine Python-API mit einer brandneuen technischen Architektur, die die am häufigsten verwendeten Operatoren wie JOIN, AGG und WINDOW unterstützt.
Python API - RoadMap
In Apache Flink 1.9 kann Python benutzerdefinierte Java-Funktionen nutzen, unterstützt jedoch nicht die Definition von benutzerdefinierten Python-nativen benutzerdefinierten Funktionen. Daher unterstützt Apache Flink 1.10 benutzerdefinierte Python-Funktionen und die Datenanalysebibliothek Pandas von Python. Apache Flink 1.11 bietet außerdem Unterstützung für die DataStream- und ML-APIs.
Die neue Python-API-Architektur besteht aus einem Benutzer-API-Modul, einem Kommunikationsmodul zwischen einer virtuellen Python-Maschine (VM) und einer Java-VM sowie einem Modul, das Aufgaben zum Betrieb an den Flink-Cluster sendet.
Wie kommunizieren Python-VMs und Java-VMs? Die Python-VM verfügt über ein Python-Gateway, das eine Verbindung mit der Java-VM unterhält, die über einen GateWayServer verfügt, der Anrufe von der Python-VM empfängt.
Apache Flink-Versionen vor 1.9 unterstützen die Python-API bereits in den Modulen DataSet und DataStream. Sie verwenden jedoch jeweils zwei verschiedene APIs. DataSet-API und DataStream-API. Eine einheitliche Architektur ist für eine Stream-Computing-Engine wie Flink von entscheidender Bedeutung, die über einheitliche Stream- und Batch-Datenverarbeitungsfunktionen verfügt. Die vorhandenen Python DataSet- und DataStream-APIs verwenden die technische Architektur von JPython. JPython kann die Python 3.X-Serie jedoch nicht ordnungsgemäß unterstützen. Infolgedessen wurde die vorhandene Python-API-Architektur aufgegeben und Flink 1.9 hat eine völlig neue Technologiearchitektur übernommen. Diese neue Python-API basiert auf der Tabellen-API.
Die Kommunikation zwischen der Tabellen-API und der Python-API wird in der Kommunikation zwischen der Python-VM und der Java-VM implementiert. Die Python-API kommuniziert mit der Java-API, die schreibt und aufruft. Die Arbeit mit der Python-API ähnelt der Arbeit mit der Java-Tabellen-API. Die neue Architektur bietet folgende Vorteile:
Wenn eine Python-VM eine Anforderung für ein Java-Objekt initiiert, erstellt die Java-VM das Objekt, speichert es in einer Speicherstruktur und weist dem Objekt eine ID zu. Diese ID wird dann an die Python-VM gesendet, die das Objekt mit der entsprechenden Objekt-ID bearbeitet. Da die Python-VM alle Objekte in der Java-VM bearbeiten kann, wird garantiert, dass die Python-Tabellen-API dieselbe Funktionalität wie die Java-Tabellen-API aufweist und vorhandene Modelle zur Leistungsoptimierung nutzen kann.
In dem neuen Architektur- und Kommunikationsmodell erhält die Python-VM die entsprechende Java-Objekt-ID und ruft die Java-Tabellen-API auf, indem einfach der Name und die Parameter der aufrufenden Methode an die Java-VM übergeben werden. Daher folgt das Entwickeln der Python-Tabellen-API denselben Schritten wie das Entwickeln der Java-Tabellen-API. Lassen Sie uns als Nächstes untersuchen, wie Sie einen einfachen Python-API-Job entwickeln.
Im Allgemeinen sind Python-Tabellenjobs in vier Schritte unterteilt. Entscheiden Sie unter Berücksichtigung der aktuellen Situation zunächst, ob der Job im Stapelmodus oder im Streaming-Modus ausgeführt werden soll. Spätere Versionen von Benutzern können diesen Schritt überspringen, aber Benutzer von Apache Flink 1.9 müssen diese Entscheidung treffen.
Sobald Sie sich für einen Jobausführungsmodus entschieden haben, wissen Sie, woher die Daten stammen und wie Sie die Datenquelle, das Schema und den Datentyp definieren. Schreiben Sie dann die Berechnungslogik (die an den Daten ausgeführte Berechnungsoperation) und speichern Sie das endgültige Berechnungsergebnis im angegebenen System. Definieren Sie als Nächstes die Spüle. Definieren Sie das Senkenschema und alle darin enthaltenen Feldtypen so, wie Sie eine Datenquelle definieren würden.
Als nächstes wollen wir verstehen, wie jeder der oben genannten Schritte mithilfe der Python-API codiert wird. Erstellen Sie zunächst eine Ausführungsumgebung, die letztendlich eine Tabellenumgebung sein sollte. Diese Tabellenumgebung muss über ein Tabellenkonfigurationsmodul mit einigen Konfigurationsparametern verfügen, die während des Ausführungsprozesses an die RunTime-Schicht übergeben werden. Dieses Modul muss auch einige benutzerdefinierte Einstellungen bereitstellen, die während der eigentlichen Serviceentwicklungsphase verwendet werden können.
Nach dem Erstellen der Ausführungsumgebung müssen Sie die Tabelle der Datenquelle definieren. Beispielsweise werden die Datensätze in der CSV-Datei durch Kommas (,) getrennt und die Felder in der Feldspalte aufgelistet. Diese Tabelle enthält nur ein Feld, Word, vom Typ String.
Welche Art von Datenstruktur und Datentyp befindet sich in der Tabellen-API-Schicht, nachdem die Datenquelle definiert und beschrieben und die Struktur der Datenquelle in eine Tabelle konvertiert wurde? Als nächstes sehen wir uns an, wie Sie mit with_SCHEMA
Felder und Feldtypen hinzufügen. Hier gibt es nur ein Feld und der Datentyp ist String. Die Datenquelle wird für nachfolgende Abfragen und Berechnungen als Tabelle im Katalog registriert.
Erstellen Sie dann eine Ergebnistabelle. Wenn die Berechnung abgeschlossen ist, speichern Sie das Berechnungsergebnis in einem dauerhaften System. Um beispielsweise einen WordCount-Job zu schreiben, verfügen Sie zunächst über eine Speichertabelle mit zwei Feldern: word und count. Dann registrieren Sie diese Tabelle als Spüle.
Nachdem Sie die Tabellensenke registriert haben, sehen wir uns an, wie die Berechnungslogik geschrieben wird. Tatsächlich ist das Schreiben von WordCount in die Python-API so einfach wie das Schreiben in die Tabellen-API. Im Gegensatz zu DataStream benötigt die Python-API nur eine Anweisungszeile, um einen WordCount-Job zu schreiben. Scannen Sie beispielsweise zuerst die Quelltabelle und gruppieren Sie die Zeilen mit der Anweisung GROUP BY nach Wörtern. Verwenden Sie dann die SELECT-Anweisung, um Wörter auszuwählen, und verwenden Sie die Aggregatfunktion, um die Anzahl für jedes Wort zu berechnen. Fügen Sie abschließend das Berechnungsergebnis in die Ergebnistabelle ein.
Die schwerwiegende Frage ist genau, wie ein WordCount-Job ausgeführt wird. Richten Sie zunächst die Entwicklungsumgebung ein. Verschiedene Versionen der Software können auf verschiedenen Computern installiert sein. Hier sind einige der Anforderungen für eine Softwareversion.
Zweitens erstellen Sie ein binäres Java-Release-Paket basierend auf dem Quellcode. Klonen Sie daher den Code im Hauptzweig, um den 1.9-Zweig zu erhalten. Natürlich können Sie den Mastercode verwenden. Der Mastercode ist jedoch nicht stabil, daher empfehlen wir die Verwendung von 1.9-Verzweigungscode. Fahren wir mit der Prozedur fort. Kompilieren Sie zunächst den Code. Zum Beispiel:
//Laden Sie Gendai herunter
git clone https://github.com/apache/flink.git
//Entführung 1.9 Minuten
cd flink; git fetch origin release-1.9
git checkout -b release-1.9 origin/release-1.9
//Strukturiertes Zwei-Fortschritts-System
mvn clean install -DskipTests -Dfast
Legen Sie das Release-Paket nach dem Kompilieren im entsprechenden Verzeichnis ab.
cd flink-dist/target/flink-1.9.0-bin/flink-1.9.0
tar -zcvf flink-1.9.0.tar.gz flink-1.9.0
Überprüfen Sie nach dem Erstellen der Java-API die API und erstellen Sie das Python-Release-Paket.
Alle Python-Benutzer wissen, dass sie zum Installieren von Paketen über die Pip-Installation entweder ihre abhängigen Bibliotheken in ihre lokale Python-Umgebung integrieren oder diese abhängigen Bibliotheken in ihrer lokalen Umgebung installieren müssen.
Dies gilt auch für Flink. Packen und installieren Sie PyFlink in einem von Pypip erkannten Ressourcenpaket. Verwenden Sie den folgenden Befehl, um das Paket zu kopieren und in Ihrer Umgebung zu installieren.
cd flink-Python;Python setup.py sdist
Dieser Prozess umschließt einfach das Java-Release-Paket mit einigen Java-Paketen und einigen Python-Paketen des PyFlink-Moduls. Suchen Sie im dist-Verzeichnis nach dem neuen Paket "apache-link-1.9.dev0.tar.gz".
cd dist/
Die Datei "apache-flink-1.9.dev0.tar.gz" im Verzeichnis dist ist ein PyFlink-Paket, das Sie mit pip install installieren können. Das Apache Flink 1.9-Installationspaket enthält sowohl Flink Table als auch Flink Table Blink. Flink unterstützt zwei Planer gleichzeitig. Sie können frei zwischen dem Standard-Flink-Planer und dem Blink-Planer wechseln. Wir empfehlen Ihnen, jeden einzelnen selbst auszuprobieren. Versuchen Sie nach dem Verpacken, es in unserer Umgebung zu installieren.
Verwenden Sie einen sehr einfachen Befehl, um zunächst zu überprüfen, ob der Befehl korrekt ist. Überprüfen Sie vor dem Ausführen des Befehls mithilfe von pip die Liste, um festzustellen, ob das Paket bereits installiert ist. Versuchen Sie dann, das im vorherigen Schritt vorbereitete Paket zu installieren. In einem realen Szenario würden Sie ein neues Paket installieren, um das Upgrade zu installieren.
pip install dist/*.tar.gz
pip list|grep flink
Verwenden Sie nach der Installation des Pakets den zuvor geschriebenen WordCount-Job, um zu überprüfen, ob die Umgebung korrekt ist. Um zu überprüfen, ob Ihre Umgebung korrekt ist, klonen Sie das Umgebungscode-Repository direkt, indem Sie den folgenden Befehl ausführen:
git clone https://github.com/sunjincheng121/enjoyment.code.git
cd enjoyment.code; Python word_count.py
Als nächstes versuchen wir es. Suchen Sie die zuvor erstellte wordCount-Jobdatei in diesem Verzeichnis. Verwenden wir Python word_count.py
direkt, um zu überprüfen, ob es ein Problem in der Umgebung gibt. Die Apache Flink Python-API sollte einen Mini-Cluster starten, um WordCount-Jobs auszuführen. Jetzt läuft der Job bereits auf dem Mini-Cluster.
In diesem Prozess liest der Code zuerst die Quelldatei und schreibt das Ergebnis in eine CSV-Datei. Suchen Sie in diesem Verzeichnis die Datei sink.csv
. Eine schrittweise Anleitung finden Sie im Video mit dem Titel "Status Quo und Planung der Apache Flink Python-API" in der Apache Flink Community China.
Lassen Sie uns nun über das Einrichten der integrierten Entwicklungsumgebung (IDE) sprechen. Wir empfehlen die Verwendung von PyCharm zum Entwickeln von Python-bezogener Logik und Jobs.
Weitere Informationen zum Einrichten der IDE erhalten Sie, wenn Sie den QR-Code scannen oder den Blog (https://enjoyment.cool) direkt besuchen. Bitte gib mir. Ich denke, es gibt viele Python-Umgebungen, aber Sie müssen die auswählen, die Sie für Ihre Pip-Installation verwendet haben. Dies ist sehr wichtig. Eine schrittweise Anleitung finden Sie im Video "Aktueller Status und Pläne der Apache Flink Python API".
Wie kann ich einen Job einreichen? Verwenden Sie zunächst die CLI-Methode, um einen Job an einen vorhandenen Cluster zu senden. Sie müssen den Cluster starten, um diese Methode verwenden zu können. Das Build-Verzeichnis befindet sich normalerweise unter Build-Target. Führen Sie diesen Befehl direkt aus, um den Cluster zu starten. Beachten Sie, dass dieser Prozess einen externen Webport verwendet. Stellen Sie die Portnummer in der Datei flink-conf.yaml
ein. Starten Sie dann den Cluster mit den Befehlen in der PPT. Um zu überprüfen, ob der Cluster erfolgreich gestartet wurde, überprüfen Sie die Protokolle oder besuchen Sie die Site mit einem Browser. Wenn der Cluster erfolgreich gestartet wurde, sehen wir uns an, wie Sie einen Job senden.
Verwenden Sie Flink run, um den folgenden Code zum Senden eines Jobs auszuführen.
./bin/flink run -py ~/training/0806/enjoyment.code/myPyFlink/enjoyment/word_count_cli.py
Verwenden Sie py, um eine Python-Datei anzugeben, pym, um ein Python-Modul anzugeben, pyfs, um eine Python-Ressourcendatei anzugeben, und j, um ein JAR-Paket anzugeben.
Mit Apache Flink 1.9 gibt es einen bequemeren Weg. Mit der Python-Shell können Sie die mit der Python-API erzielten Ergebnisse interaktiv schreiben. Die Python-Shell läuft in zwei Modi, lokal und remote, aber es gibt keinen großen Unterschied. Versuchen Sie zunächst den lokalen Modus, indem Sie den folgenden Befehl ausführen.
bin/pyflink-shell.sh local
Dieser Befehl startet einen Mini-Cluster. Wenn Sie den Code ausführen, wird das Flink-Logo mit dem Text FLINK - PYTHON - SHELL und einigen Beispielskripten zurückgegeben, die diese Funktion demonstrieren. Wenn Sie diese Skripte eingeben, werden die korrekten Ausgaben und Ergebnisse zurückgegeben. Hier können Sie entweder Streaming oder Batch schreiben. Sehen Sie sich das Video für eine detaillierte Bedienungsanleitung an.
Jetzt haben Sie ein grundlegendes Verständnis der Apache Flink 1.9 Python Table API-Architektur und der Konfiguration der Python Table API. Um zu sehen, wie ein Job in der IDE ausgeführt und ein Job mithilfe von Flink Run und Python Shell gesendet wird, habe ich ein einfaches WordCount-Beispiel betrachtet. Ich habe auch einige interaktive Möglichkeiten kennengelernt, um die Python-API von Flink zu nutzen. Nach der Einführung der Flink-Einstellungen und einer einfachen Beispieldemo werden die wichtigsten Operatoren in Apache Flink 1.9 erläutert.
Wir haben bereits gesehen, wie man einen Job schafft. Wählen Sie zunächst den Ausführungsmodus: Wählen Sie Streaming oder Batch. Definieren Sie als Nächstes die zu verwendenden Tabellen (Quell- und Ergebnistabellen), das Schema und die Datentypen. Schreiben Sie dann die Berechnungslogik. Schließlich verwenden wir die in der Python-API integrierten Aggregatfunktionen Count, Sum, Max, Min. Wenn ich beispielsweise einen WordCount-Job geschrieben habe, habe ich die Count-Funktion verwendet.
Apache Flink 1.9 erfüllt die meisten Ihrer üblichen Anforderungen. Schauen wir uns nun, abgesehen von dem, was wir bisher gesehen haben, die von Apache Flink 1.9 unterstützten Flink Table API-Operatoren an. Die Flink Table API-Operatoren (Python Table API Operator und Java Table API Operator) unterstützen Operationen wie:
Erstens Einzelstromoperationen wie SELECT, FILTER, Aggregatoperationen, Fensteroperationen und Spaltenoperationen (add_columns, drop_columns
).
Zweitens Dual-Stream-Operationen wie JOIN, MINUS und UNION.
Alle diese Operatoren werden von der Python Table-API unterstützt. In Apache Flink 1.9 ähnelt die Python-Tabellen-API funktional der Java-Tabellen-API. Als nächstes wollen wir verstehen, wie man die obigen Operatoren schreibt und wie man Python-Operatoren entwickelt.
Wie Sie vielleicht in diesem Artikel bemerkt haben, haben wir die Zeitreihen, die ein Attribut des Datenstroms sind, nicht angesprochen. Der objektive Zustand des Datenstroms besteht darin, dass er möglicherweise nicht in Ordnung ist. Apache Flink verwendet den Watermark-Mechanismus, um Datenströme außerhalb der Reihenfolge zu verarbeiten.
Angenommen, Sie haben eine JSON-formatierte Datendatei, die zwei Felder enthält: a und DateTime. Um ein Wasserzeichen zu definieren, müssen Sie beim Erstellen des Schemas eine Zeilenzeitspalte hinzufügen und den Zeilenzeitdatentyp auf Zeitstempel festlegen.
Definieren Sie Wasserzeichen auf verschiedene Arten. Verwenden Sie watermarks_periodic_bounded
, um regelmäßig Wasserzeichen zu senden. Die Zahl 60000 bezieht sich auf 60000 ms, was 60 Sekunden oder 1 Minute entspricht. Diese Definition ermöglicht es dem Programm, Datenströme außerhalb der Reihenfolge innerhalb einer Minute zu verarbeiten. Je höher der Wert, desto toleranter gegenüber Daten außerhalb der Reihenfolge und desto länger die Latenz. Weitere Informationen zur Funktionsweise von Wasserzeichen finden Sie in diesem Blog http://1t.click/7dM.
Abschließend werde ich die Anwendung von benutzerdefinierten Java-Funktionen (UDF) in Apache Flink 1.9 vorstellen. Apache Flink 1.9 unterstützt keine Python-UDFs, Sie können jedoch Java-UDFs in Python nutzen. Apache Flink 1.9 optimiert und erstellt das Tabellenmodul neu. Um eine Java-UDF zu entwickeln, importieren Sie eine einfache Abhängigkeit und entwickeln Sie eine Python-API. Flink-table-common importieren.
Als nächstes schauen wir uns an, wie Sie eine Python-API mit Javas UDF entwickeln. Angenommen, Sie müssen eine UDF entwickeln, die die Länge einer Zeichenfolge berechnet. Sie müssen die Java-Funktion mit t_env.register_java_function
bei Python registrieren und dabei den Namen und den vollständigen Pfad der Java-Funktion übergeben. Sie können dann die UDF unter dem registrierten Namen aufrufen. Weitere Informationen finden Sie in meinem Blog http://1t.click/HQF.
Wie führe ich Java UDF aus? Führen Sie es mit dem Befehl run von Flink aus. Wie bereits erwähnt, verwenden wir -j, um das UDF-JAR-Paket einzuschließen.
Unterstützt Java UDF nur Skalarfunktionen? Java UDF unterstützt nicht nur Skalarfunktionen, sondern auch Tabellenfunktionen und Aggregatfunktionen.
Hier sind einige häufig verwendete Materialien und Links zu meinem Blog. Hoffentlich helfen sie dir.
In diesem Artikel wurde die Roadmap für Verlauf und Entwicklung der Apache Flink Python-API vorgestellt. Als Nächstes erklärte ich, warum wir die Architektur der Apache Flink Python-API und die neuesten verfügbaren Architekturen geändert haben. Außerdem wurden zukünftige Pläne und neue Funktionen für die Apache Flink Python-API beschrieben. Wir empfehlen Ihnen, Ihre Vorschläge und Ideen zu teilen.
Recommended Posts