Eine Kurzanleitung zu PyFlink, die Apache Flink und Python kombiniert

Dieser Artikel stellt die Architektur von PyFlink vor und bietet eine kurze Demo der Verwendung von PyFlink zum Analysieren von CDN-Protokollen.

Warum brauchst du PyFlink?

Flink auf Python und Python auf Flink

Was genau ist PyFlink? Wie der Name schon sagt, ist PyFlink einfach eine Kombination aus Apache Flink und Python oder Flink on Python. Aber was bedeutet Flink in Python? Wenn Sie beide kombinieren, können Sie zunächst alle Funktionen von Flink in Python nutzen. Noch wichtiger ist, dass PyFlink die Rechenleistung des umfangreichen Python-Ökosystems auf Flink nutzen und die Entwicklung dieses Ökosystems weiter erleichtern kann. Mit anderen Worten, es ist Win-Win für beide Parteien. Wenn Sie sich etwas eingehender mit diesem Thema befassen, werden Sie feststellen, dass die Integration des Flink-Frameworks in die Python-Sprache kein Zufall ist.

image.png

Python und das Big Data-Ökosystem

Die Python-Sprache ist eng mit Big Data verwandt. Um dies zu verstehen, werfen wir einen Blick auf einige der Probleme, die Menschen in der Praxis mit Python lösen. Benutzerumfragen haben gezeigt, dass die meisten Benutzer Python für Datenanalyse- und maschinelle Lernanwendungen verwenden. Für diese Art von Szenarien werden einige wünschenswerte Lösungen auch im Big-Data-Bereich angesprochen. Die Integration von Python und Big Data erweitert nicht nur das Publikum von Big-Data-Produkten, sondern verbessert auch die Funktionen des Python-Ökosystems erheblich, indem die eigenständige Architektur auf eine verteilte Architektur erweitert wird. Dies erklärt auch, dass Python eine starke Nachfrage nach der Analyse großer Datenmengen hat.

image.png

Warum Flink und Python?

Die Integration von Python und Big Data entspricht einigen anderen aktuellen Trends. Aber warum unterstützt Flink jetzt Python anstelle von Go, R oder einer anderen Sprache? Und warum wählen die meisten Benutzer PyFlink gegenüber PySpark oder PyHive?

Um zu verstehen, warum, betrachten wir zunächst die Vorteile der Verwendung des Flink-Frameworks.

Als nächstes wollen wir sehen, warum Flink Python gegenüber anderen Sprachen unterstützt. Laut Statistik ist Python nach Java und C die zweitbeliebteste Sprache und hat sich seit 2018 rasant weiterentwickelt. Java und Scala sind die Standardsprachen von Flink, aber die Unterstützung für Python erscheint vernünftig.

image.png

PyFlink ist ein unvermeidliches Produkt der Entwicklung verwandter Technologien. Das Verständnis der Auswirkungen von PyFlink reicht jedoch nicht aus, da das ultimative Ziel darin besteht, Flink- und Python-Benutzern zu helfen und echte Probleme zu lösen. Daher müssen wir uns eingehender mit der Implementierung von PyFlink befassen.

image.png

PyFlink-Architektur

Um PyFlink zu implementieren, müssen wir die wichtigsten zu erreichenden Ziele und die zu lösenden Kernherausforderungen kennen. Was ist der Hauptzweck von PyFlink? Kurz gesagt, der Hauptzweck von PyFlink wird wie folgt detailliert beschrieben.

  1. Stellen Sie Python-Benutzern alle Flink-Funktionen zur Verfügung.
  2. Durch Ausführen der Analyse- / Berechnungsfunktionen von Python auf Flink können Sie die Fähigkeit von Python verbessern, Big-Data-Probleme zu lösen.

Lassen Sie uns dann die wichtigen Probleme analysieren, die gelöst werden müssen, um diese Ziele zu erreichen.

image.png

Stellen Sie Python-Benutzern Flink-Funktionen zur Verfügung

Ist es notwendig, eine Python-Engine auf Flink wie die vorhandene Java-Engine zu entwickeln, um PyFlink zu implementieren? Die Antwort ist nein. Ich habe es mit Flink Version 1.8 und früher versucht, aber es hat nicht funktioniert. Das Prinzip des Grunddesigns besteht darin, einen bestimmten Zweck zu den niedrigsten Kosten zu erreichen. Es ist am einfachsten, aber es ist am besten, eine Schicht der Python-API bereitzustellen und Ihre vorhandene Computer-Engine wiederzuverwenden.

Welche Art von Python-API sollten wir für Flink bereitstellen? High-Level-Tabellen-API, SQL, Stateful DataStream-API usw. sind bekannt. Jetzt, da wir uns der internen Logik von Flink nähern, ist es an der Zeit, die Tabellen-API und die DataStream-API für Python bereitzustellen. Aber was sind die wichtigsten Fragen, die zu dieser Zeit noch offen sind?

image.png

Schlüsselprobleme

Offensichtlich ist es wichtig, einen Handshake zwischen der virtuellen Python-Maschine (PyVM) und der virtuellen Java-Maschine (JVM) herzustellen, der für Flink zur Unterstützung mehrerer Sprachen unerlässlich ist. Um dieses Problem zu lösen, muss eine geeignete Kommunikationstechnologie ausgewählt werden. Okay, lass uns gehen.

image.png

Auswahl der Kommunikationstechnologie für virtuelle Maschinen

Derzeit sind die Lösungen zur Implementierung der Kommunikation zwischen PyVM und JVM Apache Beam und [Py4J](https: // Es gibt zwei (www.py4j.org/?spm=a2c65.11461447.0.0.464e694fRAGzkI). Ersteres ist ein bekanntes Projekt, das mehrere Sprachen und mehrere Engines unterstützt, und letzteres ist eine Lösung, die sich auf die Kommunikation zwischen PyVM und JVM spezialisiert hat. Um den Unterschied zwischen Apache Beam und Py4J zu verstehen, können Sie verschiedene Perspektiven vergleichen und kontrastieren. Betrachten Sie zunächst diese Analogie. Um die Mauer zu überqueren, gräbt Py4J ein Loch wie eine Mogura und Apache Beam zerstört die gesamte Mauer wie ein großer Bär. Unter diesem Gesichtspunkt ist die Implementierung der VM-Kommunikation mit Apache Beam etwas kompliziert. Kurz gesagt, Apache Beam ist universell und in extremen Fällen unflexibel.

image.png

Darüber hinaus erfordert Flink eine interaktive Programmierung wie FLIP-36. Darüber hinaus muss Flink hinsichtlich des API-Designs, insbesondere der mehrsprachigen Unterstützung, semantisch konsistent sein, damit es ordnungsgemäß funktioniert. Py4J ist eindeutig die beste Option zur Unterstützung der Kommunikation zwischen PyVM und JVM, da die vorhandene Architektur von Apache Beam diese Anforderungen nicht erfüllen kann.

image.png

Technische Architektur

Nachdem wir die Kommunikation zwischen PyVM und JVM hergestellt hatten, erreichten wir unser erstes Ziel, die Funktionen von Flink für Python-Benutzer verfügbar zu machen. Dies wurde bereits in Flink Version 1.9 erreicht. Schauen wir uns nun die Architektur der PyFlink-API in Flink Version 1.9 an.

Flink Version 1.9 verwendet Py4J, um die Kommunikation mit virtuellen Maschinen zu implementieren. Das Gateway für PyVM und der Gateway-Server für JVM konnten Python-Anforderungen akzeptieren. Die Python-API bietet auch Objekte wie TableENV und Table, die mit denen der Java-API identisch sind. Daher besteht das Wesentliche beim Schreiben der Python-API darin, wie die Java-API aufgerufen wird. Flink Version 1.9 behebt auch Probleme bei der Stellenvermittlung. Sie können Jobs auf verschiedene Arten senden, z. B. durch Ausführen von Python-Befehlen oder mithilfe der Python-Shell oder der CLI.

image.png

Aber was sind die Vorteile dieser Architektur? Erstens verfügt es über eine einfache Architektur, die eine semantische Konsistenz zwischen der Python-API und der Java-API gewährleistet. Zweitens bietet es eine hervorragende Python-Jobverarbeitungsleistung, die mit Java-Jobs vergleichbar ist. Beispielsweise konnte die Java-API von Flink während des letztjährigen Double 11 2.551 Millionen Datensätze pro Sekunde verarbeiten.

image.png

Führen Sie Python-Analyse- / Berechnungsfunktionen auf Flink aus

Im vorherigen Abschnitt wurde beschrieben, wie Sie die Funktionen von Flink für Python-Benutzer verfügbar machen. Hier erfahren Sie, wie Sie eine Python-Funktion in Flink ausführen. Im Allgemeinen gibt es zwei Möglichkeiten, Python-Funktionen auf Flink auszuführen:

1, ** Wählen Sie eine typische Python-Klassenbibliothek aus und fügen Sie deren API zu PyFlink hinzu. ** Diese Methode ist zeitaufwändig, da es zu viele Klassenbibliotheken für Python gibt. Vor dem Einbetten der API müssen Sie die Python-Ausführung optimieren.

2, ** Basierend auf den Merkmalen der vorhandenen Flink Table-API und der Python-Klassenbibliothek können alle Funktionen der vorhandenen Python-Klassenbibliothek als benutzerdefinierte Funktionen behandelt und in Flink integriert werden. ** Unterstützt in Flink Version 1.10. Was sind die wichtigen Fragen der Funktionsintegration? Auch hier geht es darum, benutzerdefinierte Python-Funktionen auszuführen.

Als nächstes wählen wir die Technologie für dieses wichtige Problem aus. image.png

Auswahl der Technologie zur Ausführung benutzerdefinierter Funktionen

Das Ausführen von benutzerdefinierten Python-Funktionen ist eigentlich ziemlich kompliziert. Nicht nur die Kommunikation zwischen virtuellen Maschinen, sondern auch die Verwaltung der Python-Ausführungsumgebung, die Analyse der zwischen Java und Python ausgetauschten Geschäftsdaten, die Übergabe des Flink-Status-Backends an Python, die Überwachung des Ausführungsstatus usw. .. So kompliziert kommt Apache Beam ins Spiel. Als großer Bär mit Unterstützung für mehrere Engines und Sprachen kann Apache Beam in dieser Situation viel dazu beitragen, wie Apache Beam die Ausführung von benutzerdefinierten Python-Funktionen handhabt. Mal sehen ob.

Im Folgenden wird das Portability Framework dargestellt, eine stark abstrahierte Architektur für Apache Beam, die mehrere Sprachen und Engines unterstützt. Apache Beam unterstützt derzeit verschiedene Sprachen, darunter Java, Go und Python. Beam Fn Runners und Execution am unteren Rand der Abbildung zeigen die Engine und die benutzerdefinierte Feature-Ausführungsumgebung. Apache Beam verwendet Protobuf, um die Datenstruktur und [gRPC](https :: //grpc.io/?spm=a2c65.11461447.0.0.464e694fRAGzkI) Aktiviert die Kommunikation über das Protokoll und kapselt den Kerndienst von gRPC. In dieser Hinsicht ähnelt Apache Beam eher einem Glühwürmchen, das den Ausführungspfad benutzerdefinierter Funktionen in PyFlink beleuchtet. Interessanterweise sind Glühwürmchen zum Maskottchen von Apache Beam geworden, daher ist dies möglicherweise kein Zufall.

Schauen wir uns als nächstes den von Apache Beam bereitgestellten gRPC-Dienst an. image.png

In der folgenden Abbildung repräsentiert der Runner den Java-Operator von Flink. Der Runner wird dem SDK-Worker in der Python-Ausführungsumgebung zugeordnet. Apache Beam verfügt über abstrakte Dienste wie Steuerung, Daten, Status und Protokolle. Tatsächlich werden diese Dienste von Beam Flink Runner seit langem stabil und effizient betrieben. Dies erleichtert die Ausführung von PyFlink UDF. Darüber hinaus bietet Apache Beam Lösungen für API-Aufrufe und die Ausführung benutzerdefinierter Funktionen. PyFlink verwendet Py4J für die Kommunikation zwischen virtuellen Maschinen auf API-Ebene und verwendet das Portability Framework von Apache Beam, um die Ausführungsumgebung für benutzerdefinierte Funktionen festzulegen.

Dies zeigt, dass PyFlink sich strikt an das Prinzip hält, einen bestimmten Zweck zu den niedrigsten Kosten bei der Technologieauswahl zu erreichen, und immer die für die langfristige Entwicklung am besten geeignete Technologiearchitektur verwendet. .. Übrigens habe ich in Zusammenarbeit mit Apache Beam über 20 Optimierungs-Patches an die Beam-Community gesendet.

image.png

Benutzerdefinierte Funktionsarchitektur

Die UDF-Architektur muss nicht nur die Kommunikation zwischen PyVM und JVM implementieren, sondern auch unterschiedliche Anforderungen während der Kompilierungs- und Ausführungsphase erfüllen. Im folgenden benutzerdefinierten Funktionsarchitekturdiagramm von PyLink wird das Verhalten in JVM in Grün und das Verhalten in PyVM in Blau angezeigt. Werfen wir einen Blick auf das lokale Design während der Kompilierung. Das lokale Design basiert auf reinen API-Mapping-Aufrufen. Py4J wird für die VM-Kommunikation verwendet. Jedes Mal, wenn Sie eine Python-API aufrufen, wird die entsprechende Java-API synchron aufgerufen.

Die benutzerdefinierte Funktionsregistrierungs-API (register_function) wird benötigt, um benutzerdefinierte Funktionen zu unterstützen. Sie benötigen auch einige Bibliotheken von Drittanbietern, wenn Sie benutzerdefinierte Python-Funktionen definieren. Das Hinzufügen einer Abhängigkeit erfordert daher eine Reihe zusätzlicher Methoden, z. B. "add_Python_file ()". Beim Schreiben eines Python-Jobs wird auch die Java-API aufgerufen, bevor der Job zum Erstellen eines JobGraph gesendet wird. Sie können dann Jobs auf verschiedene Arten an den Cluster senden, z. B. über die CLI.

image.png Siehe Bild https://yqintl.alicdn.com/a72ad37ed976e62edc9ba8dcb027bf61be8fe3f3.gif

Nun wollen wir sehen, wie die Python- und Java-APIs in dieser Architektur funktionieren. Auf der Java-Seite weist JobMaster TaskManager Jobs wie allgemeine Java-Jobs zu, und TaskManager führt Aufgaben aus, bei denen Operatoren sowohl in JVM als auch in PyVM ausgeführt werden. Benutzerdefinierte Python-Funktionsoperatoren entwerfen verschiedene gRPC-Dienste für die Kommunikation zwischen JVM und PyVM, z. B. DataService für die Geschäftsdatenkommunikation und StateService für Python UDF zum Aufrufen von Java-Status-Backends. Machen. Viele andere Dienste wie Protokollierung und Metriken werden bereitgestellt.

Diese Dienste basieren auf der Fn-API von Beam. Die benutzerdefinierte Funktion wird schließlich auf dem Python-Worker ausgeführt, und der entsprechende gRPC-Dienst gibt das Ergebnis an den benutzerdefinierten Python-Funktionsoperator in der JVM zurück. Python-Worker können als Prozesse in Docker-Containern und sogar in externen Service-Clustern ausgeführt werden. Dieser Erweiterungsmechanismus bildet eine solide Grundlage für die Integration von PyFlink in andere Python-Frameworks. Nachdem Sie ein grundlegendes Verständnis der in PyFlink 1.10 eingeführten benutzerdefinierten Funktionsarchitektur von Python haben, werfen wir einen Blick auf deren Vorteile.

Erstens muss es ein ausgereifter mehrsprachiger Rahmen sein. Die strahlbasierte Architektur kann problemlos erweitert werden, um andere Sprachen zu unterstützen. Zweitens Unterstützung für statusbehaftete benutzerdefinierte Funktionen. Beam abstrahiert zustandsbehaftete Dienste und erleichtert PyFlink die Unterstützung zustandsbezogener benutzerdefinierter Funktionen. Der dritte ist die einfache Wartung. Zwei aktive Communities - Apache Beam und Apache Flink - pflegen und optimieren dasselbe Framework.

image.png

Verwendung von PyFlink

Nachdem Sie die Architektur von PyFlink und die dahinter stehenden Ideen verstanden haben, werfen wir einen Blick auf die spezifischen Anwendungsszenarien von PyFlink.

PyFlink-Anwendungsszenario

Welche Geschäftsszenarien unterstützt PyFlink? Das Anwendungsszenario kann aus zwei Perspektiven analysiert werden. Python und Java. Beachten Sie, dass PyFlink für alle Java-anwendbaren Szenarien geeignet ist.

  1. ** Ereignisgesteuerte Szenarien ** wie Klickplantagen und Überwachung.
  2. ** Datenanalyse ** wie Bestandsverwaltung und Datenvisualisierung.
  3. ** Datenpipeline **, auch als ETL-Szenarien wie Protokollanalyse bezeichnet. 4, ** Maschinelles Lernen ** wie gezielte Empfehlungen. In all diesen Szenarien können Sie PyFlink verwenden. PyFlink gilt auch für Python-spezifische Szenarien wie das wissenschaftliche Rechnen. Bei so vielen Anwendungsszenarien fragen Sie sich möglicherweise genau, welche PyFlink-APIs verfügbar sind. Schauen wir uns jetzt diese Frage an.

image.png

Installieren Sie PyFlink

Sie müssen PyFlink installieren, bevor Sie die API verwenden können. Führen Sie zur Installation von PyFlink derzeit den folgenden Befehl aus: image.png

PyFlink API Die PyFlink-API ist vollständig in die Java-Tabellen-API integriert und unterstützt eine Vielzahl von relationalen und Fensteroperationen. Einige der benutzerfreundlichen PyFlink-APIs sind sogar noch leistungsfähiger als die SQL-API, z. B. spaltenspezifische APIs. Zusätzlich zur API bietet PyFlink mehrere Möglichkeiten, UDFs in Python zu definieren. image.png

Benutzerdefinierte Funktionen in PyFlink definieren

ScalarFunction kann erweitert werden, um mehr Zusatzfunktionen bereitzustellen (z. B. durch Hinzufügen von Metriken). Darüber hinaus unterstützen die Benutzerfunktionen von PyFlink alle von Python unterstützten Methodendefinitionen, einschließlich Lambda-Funktionen, benannter Funktionen und aufrufbarer Funktionen.

Verwenden Sie nach dem Definieren dieser Methoden PyFlink Decorators, um sie zu kennzeichnen und die Eingabe- / Ausgabedatentypen zu beschreiben. Sie können auch die Typhinweisungsfunktion von Python nutzen, um spätere Versionen für die Typableitung weiter zu optimieren. Das folgende Beispiel gibt Ihnen ein besseres Verständnis für die Definition benutzerdefinierter Funktionen.

image.png

Ein Beispiel für die Definition einer benutzerdefinierten Python-Funktion

In diesem Beispiel werden die beiden Zahlen addiert. Importieren Sie die dafür benötigten Klassen und definieren Sie die oben genannten Funktionen. Dies ist ziemlich einfach, also gehen wir zum eigentlichen Fall über.

image.png

Für PyFlink: Echtzeit-Protokollanalyse von Alibaba Cloud CDN

Hier zeigen wir Ihnen anhand der Echtzeit-Protokollanalysefunktion des Alibaba Cloud Content Deliver Network (CDN), wie Sie praktische Probleme mit PyFlink lösen können. Wir verwenden Alibaba Cloud CDN, um das Herunterladen von Ressourcen zu beschleunigen. Im Allgemeinen werden CDN-Protokolle in einem gemeinsamen Muster analysiert. Zunächst werden Protokolldaten vom Randknoten erfasst und in der Nachrichtenwarteschlange gespeichert. Zweitens werden Nachrichtenwarteschlangen mit Echtzeit-Rechenclustern kombiniert, um eine Echtzeit-Protokollanalyse durchzuführen. Die dritte besteht darin, die Analyseergebnisse in das Speichersystem zu schreiben. In diesem Beispiel wird die Architektur instanziiert, Kafka wird als Nachrichtenwarteschlange verwendet, Flink wird für Echtzeit-Computing verwendet und die endgültigen Daten werden in einer MySQL-Datenbank gespeichert. image.png

Bedarf

Der Einfachheit halber haben wir die Anforderungen für die tatsächliche Unternehmensstatistik vereinfacht. In diesem Beispiel werden Seitenaufruf-, Download- und Download-Geschwindigkeitsstatistiken nach Region erfasst. Als Datenformat werden nur Kernfelder ausgewählt. Beispielsweise ist "uuid" die eindeutige Protokoll-ID, "client_ip" die Zugriffsquelle, "request_time" die Downloadzeit der Ressource und "response_size" die Größe der Ressourcendaten. Hier enthält das ursprüngliche Protokoll kein regionales Feld, obwohl wir regionale Statistiken sammeln müssen. Daher müssen wir eine Python-UDF definieren, um den Bereich jedes Datenpunkts gemäß "client_ip" abzufragen. Lassen Sie uns analysieren, wie eine benutzerdefinierte Funktion definiert wird.

Benutzerdefinierte Funktionsdefinition

Hier wird die Namensfunktion der benutzerdefinierten Funktion "ip_to_province ()" definiert. Die Eingabe ist die IP-Adresse und die Ausgabe ist die Bereichsnamenzeichenfolge. Hier werden sowohl Eingabe- als auch Ausgabetypen als Zeichenfolgen definiert. Der Abfragedienst hier dient zu Demonstrationszwecken. In einer Produktionsumgebung sollten Sie sie durch einen zuverlässigen Regionsabfragedienst ersetzen.

image.png

import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
   """
   format:
       {
       'ip': '27.184.139.25',
       'pro': 'Provinz Hebei',
       'proCode': '130000',
       'city': 'Ishiya Sho Stadt',
       'cityCode': '130100',
       'region': 'Shouju',
       'regionCode': '130126',
       'addr': 'Provinz Hebei, Stadt Ishiyasho',
       'regionNames': '',
       'err': ''
       }
   """
   try:
       urlobj = urlopen( \
        'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
       data = str(urlobj.read(), "gbk")
       pos = re.search("{[^{}]+\}", data).span()
       geo_data = json.loads(data[pos[0]:pos[1]])
       if geo_data['pro']:
           return geo_data['pro']
       else:
           return geo_data['err']
   except:
       return "UnKnow"

Definition des Steckverbinders

Nachdem wir die Anforderungen analysiert und die benutzerdefinierten Funktionen definiert haben, fahren wir mit der Jobentwicklung fort. In einer typischen Jobstruktur müssen Sie einen Quellconnector zum Lesen von Kafka-Daten und einen Senkenconnector zum Speichern der Operationsergebnisse in einer MySQL-Datenbank definieren. Schließlich müssen wir auch statistische Logik schreiben.

PyFlink unterstützt auch SQL-DDL-Anweisungen, mit denen Sie Quellconnectors mithilfe einfacher DDL-Anweisungen definieren können. Stellen Sie sicher, dass connector.type auf Kafka gesetzt ist. Sie können auch eine DDL-Anweisung verwenden, um einen Sink-Connector zu definieren und connector.type auf jdbc zu setzen. Wie Sie sehen können, ist die Logik zum Definieren eines Connectors sehr einfach. Schauen wir uns als nächstes die Kernlogik der Statistik an.

image.png

kafka_source_ddl = """
CREATE TABLE cdn_access_log (
 uuid VARCHAR,
 client_ip VARCHAR,
 request_time BIGINT,
 response_size BIGINT,
 uri VARCHAR
) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.topic' = 'access_log',
 'connector.properties.zookeeper.connect' = 'localhost:2181',
 'connector.properties.bootstrap.servers' = 'localhost:9092',
 'format.type' = 'csv',
 'format.ignore-parse-errors' = 'true'
)
"""

mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
 province VARCHAR,
 access_count BIGINT,
 total_download BIGINT,
 download_speed DOUBLE
) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
 'connector.table' = 'access_statistic',
 'connector.username' = 'root',
 'connector.password' = 'root',
 'connector.write.flush.interval' = '1s'
)
"""

Statistische Kernlogik

In diesem Teil müssen Sie zuerst die Daten aus der Datenquelle lesen und dann "ip_to_province (ip)" verwenden, um "client_ip" in eine bestimmte Region zu konvertieren. Sammeln Sie dann regionale Seitenaufruf-, Download- und Download-Geschwindigkeitsstatistiken. Speichern Sie abschließend die statistischen Ergebnisse in der Ergebnistabelle. Diese statistische Logik verwendet nicht nur benutzerdefinierte Python-Funktionen, sondern auch zwei in Flink integrierte Java AGG-Funktionen, "sum" und "count".

image.png

#Kernstatistik
t_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " #Name des IP-Konvertierungsbezirks
           "response_size, request_time")\
   .group_by("province")\
   .select( #Berechnete Anzahl von Fragen
           "province, count(uuid) as access_count, " 
           #Berechnete Lademenge
           "sum(response_size) as total_download,  " 
           #Berechnete Ladegeschwindigkeit
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic")

Vollständiger Code für die Echtzeit-Protokollanalyse

Überprüfen wir nun den Code erneut. Zuerst müssen Sie die Kernabhängigkeiten importieren, dann die ENV erstellen und schließlich den Planer einrichten. Flink unterstützt derzeit Flink- und Blink-Planer. Wir empfehlen die Verwendung eines Blinkplaners.

Führen Sie als Nächstes die DDL-Anweisung aus, um die zuvor definierte Kafka-Quelltabelle und MySQL-Ergebnistabelle zu registrieren. Die dritte besteht darin, die Python-UDF zu registrieren. Beachten Sie, dass Sie in Ihrer API-Anforderung andere UDF-Abhängigkeiten angeben und diese zusammen mit dem Job an den Cluster senden können. Schreiben Sie abschließend die statistische Kernlogik, rufen Sie den Executor an und senden Sie den Job. Bisher haben Sie einen Echtzeit-Protokollanalysejob für das Alibaba Cloud CDN erstellt. Lassen Sie uns die tatsächlichen statistischen Ergebnisse überprüfen.

image.png

import os

from pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl

#创 KEN Table Environment, Planer für den parallelen Gebrauch
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
   env,
   environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

#Kafka Nummerntabelle
t_env.sql_update(kafka_source_ddl)
#创 KEN MySql-Ergebnistabelle
t_env.sql_update(mysql_sink_ddl)

#Hinweis 册 IP-Konvertierungsdistriktname UDF
t_env.register_function("ip_to_province", ip_to_province)

#Additionsabhängiger Python-Text
t_env.add_Python_file(
    os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
    os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")

#Kernstatistik
t_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " #Name des IP-Konvertierungsbezirks
           "response_size, request_time")\
   .group_by("province")\
   .select( #Berechnete Anzahl von Fragen
           "province, count(uuid) as access_count, " 
           #Berechnete Lademenge
           "sum(response_size) as total_download,  " 
           #Berechnete Ladegeschwindigkeit
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic")

#Unternehmen
t_env.execute("pyFlink_parse_cdn_log")

Ausgabeergebnis der Echtzeit-Protokollanalyse

Ich habe die Scheindaten als CDN-Protokolldaten an Kafka gesendet. Auf der rechten Seite der folgenden Abbildung werden Seitenaufruf-, Download- und Download-Geschwindigkeitsstatistiken in Echtzeit nach Regionen erfasst.

image.png Siehe Analyseergebnisse https://yqintl.alicdn.com/e05da15f039d8331896ee1e7f294585416809ad9.gif

Wie sehen die Zukunftsaussichten für PyFlink aus?

Im Allgemeinen ist die Geschäftsentwicklung mit PyFlink einfach. Sie können Geschäftslogik einfach über SQL- oder Tabellen-APIs schreiben, ohne die zugrunde liegende Implementierung verstehen zu müssen. Werfen wir einen Blick auf die allgemeinen Aussichten für PyFlink.

Roadmap nach Zweck

Die Entwicklung von PyFlink zielte darauf ab, Python-Benutzern Flink-Funktionen zur Verfügung zu stellen und Python-Funktionen in Flink zu integrieren. Gemäß der unten gezeigten PyFlink-Roadmap wurde zuerst eine Kommunikation zwischen PyVM und JVM hergestellt. Und Flink 1.9 bietet die Python Table-API, die Python-Benutzern die Funktionalität der vorhandenen Flink Table-API eröffnet. In Flink 1.10 werden die Apache Beam-Integration, die Einstellungen für die benutzerdefinierte Funktionsausführungsumgebung von Python, das Python-Abhängigkeitsmanagement für andere Klassenbibliotheken, die benutzerdefinierte Funktions-API-Definition zur Unterstützung von benutzerdefinierten Python-Funktionen usw. ausgeführt. , Vorbereitet, um Python-Funktionen in Flink zu integrieren.

Um die Funktionalität von verteiltem Python zu erweitern, verwendet PyFlink Pandas Series und [DataFrame](https: //pandas.pydata). org / pandas-docs / stabile / Getting_started / dsintro.html? Spm = a2c65.11461447.0.0.464e694fRAGzkI) wird unterstützt, und benutzerdefinierte Pandas-Funktionen können direkt mit PyFlink verwendet werden. In Zukunft planen wir, benutzerdefinierte Python-Funktionen auf SQL-Clients zu aktivieren, um die Verwendung von PyFlink zu vereinfachen. Es bietet auch eine Python ML-Pipeline-API, um PyFlink für Python-Benutzer zum maschinellen Lernen verfügbar zu machen. Die Überwachung der Ausführung der benutzerdefinierten Funktionen von Python ist in Produktion und Geschäft sehr wichtig. Daher bietet PyFlink eine zusätzliche Metrikverwaltung für benutzerdefinierte Python-Funktionen. Diese Funktionen sind in Flink 1.11 integriert.

Diese sind jedoch nur ein Teil der zukünftigen Entwicklungspläne von PyFlink. In Zukunft gibt es noch viel mehr zu tun, einschließlich der Optimierung der PyFlink-Leistung, der Bereitstellung von Grafikcomputer-APIs und der Unterstützung von Pandas auf der nativen Pandas-API von Flink. Indem wir die vorhandenen Funktionen von Flink für Python-Benutzer kontinuierlich verfügbar machen und die leistungsstarken Funktionen von Python in Flink integrieren, erreichen wir unser ursprüngliches Ziel, das Python-Ökosystem zu erweitern.

image.png Siehe Bild https://yqintl.alicdn.com/f85ba5bd5d24a01558e751bcdc8887b3f5d565ca.gif

PyFlink 1.11 Vorschau

Werfen wir einen kurzen Blick auf die Punkte von PyFlink im aktualisierten Flink 1.11.

Funktionalität

Schauen wir uns die Kernfunktionen von PyFlink genauer an, das auf Flink 1.11 basiert. Wir konzentrieren uns auf die Funktionalität, Leistung und Benutzerfreundlichkeit von PyFlink und werden die von Pandas benutzerdefinierten Funktionen in PyFlink 1.11 unterstützen. Auf diese Weise können die praktischen Funktionen der Klassenbibliothek von Pandas wie kumulative Verteilungsfunktionen direkt in PyFlink verwendet werden.

image.png

Außerdem wird die ML-Pipeline-API in PyFlink integriert, um die Geschäftsanforderungen in Szenarien für maschinelles Lernen zu erfüllen. Hier ist ein Beispiel für die Implementierung der KMeans-Methode mit PyFlink.

image.png

Performance

Wir werden uns auch auf die Verbesserung der Leistung von PyFlink konzentrieren. Wir werden versuchen, die Ausführungsleistung von Python UDF mithilfe von Codegen, CPython, optimierter Serialisierung und Deserialisierung zu verbessern. Vorläufige Vergleiche zeigen, dass PyFlink 1.11 etwa 15-mal besser abschneidet als PyFlink 1.10.

image.png

Benutzerfreundlichkeit

Um die Verwendung von PyFlink zu vereinfachen, unterstützen wir benutzerdefinierte Python-Funktionen in SQL DDL- und SQL-Clients. Dies macht PyFlink auf einer Vielzahl von Kanälen verfügbar.

image.png

PyFlink Roadmap, Mission, Vision

Wir haben PyFlink bereits definiert und seine Auswirkungen, die API-Architektur, die benutzerdefinierte Funktionsarchitektur sowie die Kompromisse und Vorteile der Architektur erläutert. Wir haben den CDN-Fall in Flink 1.11, die PyFlink-Roadmap, die PyFlink-Punkte und mehr gesehen. Aber was brauchst du noch?

Schauen wir uns zum Schluss die Zukunft von PyFlink an. Wie sieht der Ausblick für PyFlink aus, der darauf abzielt, die Funktionen von Flink für Python-Benutzer verfügbar zu machen und Python-Funktionen auf Flink auszuführen? Wie Sie vielleicht wissen, ist PyFlink Teil von Apache Flink und enthält eine Laufzeit- und API-Schicht.

Wie wird sich PyFlink in diesen beiden Schichten entwickeln? Zur Laufzeit erstellt PyFlink allgemeine gRPC-Dienste (Steuerung, Daten, Status usw.) für die Kommunikation zwischen JVM und PyVM. Dieses Framework abstrahiert die Operatoren von benutzerdefinierten Java Python-Funktionen und erstellt einen Python-Ausführungscontainer, um die Python-Ausführung auf verschiedene Arten zu unterstützen. Beispielsweise kann PyFlink als Prozess in einem Docker-Container und sogar in einem externen Service-Cluster ausgeführt werden. Unbegrenzte Erweiterungen sind aktiviert, insbesondere in Form von Sockets, wenn sie in einem externen Service-Cluster ausgeführt werden. Dies alles spielt eine wichtige Rolle bei der späteren Integration von Python.

In Bezug auf die API werden wir die Python-basierte API in Flink verfügbar machen, um die Mission zu erfüllen. Dies hängt auch vom Py4J VM-Kommunikationsframework ab. PyFlink wird nach und nach weitere APIs unterstützen, darunter die Java-API von Flink (Python-Tabellen-API, UDX, ML-Pipeline, DataStream, CEP, Gelly, Status-API usw.) und die Pandas-API, die bei Python-Benutzern am beliebtesten ist. Es ist ein Zeitplan. Basierend auf diesen APIs wird PyFlink weiterhin in andere Ökosysteme integriert und die Entwicklung erleichtern, z. B. Notebook, Zeppelin, Jupyter, Alink. / alink-is-now-open-source_595847? spm = a2c65.11461447.0.0.464e694fRAGzkI) und funktioniert mit Alibabas Open-Source-Version Flink. Derzeit ist die Funktionalität von PyAlink vollständig integriert. PyFlink wird auch in bestehende KI-Systemplattformen wie den bekannten TensorFlow integriert.

Zu diesem Zweck können wir sehen, dass missionsbasierte Streitkräfte PyFlink am Leben erhalten. Auch hier besteht die Mission von PyFlink darin, die Funktionen von Flink für Python-Benutzer verfügbar zu machen und Python-Parsing- und -Computing-Funktionen für Flink auszuführen. Derzeit arbeiten die Hauptverantwortlichen von PyFlink mit dieser Mission hart in der Community.

image.png Siehe Bild https://yqintl.alicdn.com/908ea3ff2a2fc93d3fe2797bbe9c302ad83c0581.gif

PyFlink Core Committer

Abschließend möchte ich den Core Committer von PyFlink vorstellen.

Der letzte Committer bin ich. Meine Einführung ist am Ende dieses Beitrags. Wenn Sie Fragen zu PyFlink haben, wenden Sie sich bitte an unser Committer-Team.

image.png

Bei häufigen Problemen empfehlen wir Ihnen, eine E-Mail an jemanden auf der Flink-Benutzerliste zu senden, um sie zu teilen. Im Falle eines dringenden Problems empfehlen wir, eine E-Mail an den Committer zu senden. Für eine effektive Speicherung und Freigabe können Sie jedoch in Stackoverflow Fragen stellen. Bevor Sie eine Frage stellen, durchsuchen Sie zunächst den Inhalt Ihrer Frage, um festzustellen, ob sie beantwortet wurde. Wenn nicht, geben Sie bitte Ihre Frage klar an. Vergessen Sie nicht, Ihrer Frage das PyFlink-Tag hinzuzufügen.

image.png

Überblick

In diesem Artikel habe ich mir PyFlink genauer angesehen. Die PyFlink-API-Architektur verwendet Py4J für die Kommunikation zwischen PyVM und JVM und wurde entwickelt, um die semantische Konsistenz zwischen Python- und Java-APIs aufrechtzuerhalten. Die benutzerdefinierte Python-Funktionsarchitektur lässt sich in das Portability Framework von Apache Beam integrieren, um effiziente und stabile benutzerdefinierte Python-Funktionen bereitzustellen. Es interpretiert auch die Gedanken hinter der Architektur, die technischen Kompromisse und die Vorzüge bestehender Gebäude.

Als Nächstes stellte ich die Geschäftsszenarien vor, die auf PyFlink angewendet werden können, und stellte anhand des Echtzeitprotokolls von Alibaba Cloud CDN die Funktionsweise von PyFlink vor.

Danach habe ich mir die PyFlink-Roadmap angesehen und eine Vorschau der PyFlink-Punkte in Flink 1.11 angezeigt. Mit PyFlink 1.11 können Sie eine 15-fache oder mehr Leistungsverbesserung im Vergleich zu PyFlink 1.10 erwarten. Schließlich analysierten wir die Missionen von PyFlink, "PyFlink für Python-Benutzer verfügbar machen" und "Die Analyse- und Berechnungsfunktionen von Python auf Flink ausführen".

Über den Autor

Der Autor dieses Artikels, Son Kinjo, kam 2011 zu Alibaba. Nach neun Jahren in Alibaba leitete Herr Son die Entwicklung vieler interner Kernsysteme, darunter das Verhaltensprotokoll-Managementsystem der Alibaba Group, Arirang, das Cloud-Transcodierungssystem und das Dokumentenkonvertierungssystem. Anfang 2016 lernte er die Apache Flink Community kennen. Beteiligte sich zunächst als Entwickler an der Stadtentwicklung. Danach leitete er die Entwicklung spezifischer Module und war verantwortlich für die Erstellung der Apache Flink Python API (PyFlink). Derzeit ist er PMC-Mitglied von Apache Flink und ALC (Peking) und Kommissar von Apache Flink, Apache Beam und Apache IoT DB.

Recommended Posts

Eine Kurzanleitung zu PyFlink, die Apache Flink und Python kombiniert
So schreiben Sie eine Meta-Klasse, die sowohl Python2 als auch Python3 unterstützt
Ein netter Nimporter, der Nim und Python verbindet
Ich möchte ein Programm ausführen und verteilen, das die Größe von Bildern in Python3 + Pyinstaller ändert
Python-Skript, das das Azure-Status-RSS crawlt und an Hipchat sendet
Ein Programm, das ein paar Kilogramm BMI und Standardgewicht verlangt [Python]
Ein Skript, das Ihre bevorzugten Python-Module und Binärdateien in einer einzigen Lambda-Ebene kombiniert
Ein schneller Vergleich der Testbibliotheken von Python und node.js.
[Python] So schreiben Sie eine Dokumentzeichenfolge, die PEP8 entspricht
[Python] Eine praktische Bibliothek, die Kanji in Hiragana konvertiert
Erstellt einen Toolsver, der Betriebssystem, Python, Module und Toolversionen an Markdown ausspuckt
[C / C ++] Übergeben Sie den in C / C ++ berechneten Wert an eine Python-Funktion, um den Prozess auszuführen, und verwenden Sie diesen Wert in C / C ++.
Python-Fehlermeldungen sind spezifisch und leicht zu verstehen "ga" (davor Doppelpunkt (:) und Semikolon (;))
Lern-Roadmap, mit der Sie Services mit Python von Grund auf neu entwickeln und veröffentlichen können
Eine Geschichte, die es einfach macht, den Wohnbereich mit Elasticsearch und Python abzuschätzen
Ein Hinweis, der eine Python-Anwendung von Circle CI auf Elastic Beanstalk bereitstellt und Slack benachrichtigt
[Python] Ein Programm, um die Anzahl der Äpfel und Orangen zu ermitteln, die geerntet werden können
Ein Python-Skript, das ein GTK-Bild (Clipboard) in einer Datei speichert.
Eine Standardmethode zum Entwickeln und Verteilen von Paketen in Python
Versuchen Sie, ein Unterfenster mit PyQt5 und Python zu öffnen
Erstellen Sie eine Python-Umgebung und übertragen Sie Daten auf den Server
Erstellen Sie den Code, der in Python "A und vorgeben B" ausgibt
Versuchen Sie einfach, einen Webhook mit ngrok und Python zu erhalten
Ein Python-Skript, das Oracle-Datenbankdaten in CSV konvertiert
[Python] Eine Geschichte, die in eine Rundungsfalle zu geraten schien
Apache mod_auth_tkt und Python AuthTkt
Herausforderungen und Chancen von Apache Flink
Ich möchte einen Platzhalter verwenden, den ich mit Python entfernen möchte
Reguläre Ausdrücke, die in Python leicht und solide zu erlernen sind
Pythons regulärer Ausdruck, str und unicode sind nüchtern
[Python] So fügen Sie einer Tabelle Zeilen und Spalten hinzu (pandas DataFrame)
[Python] Ein Memo, das ich versucht habe, mit Asyncio zu beginnen
So erstellen Sie eine Überwachungskamera (Überwachungskamera) mit Opencv und Python
Ich habe versucht, mit Selenium und Python einen regelmäßigen Ausführungsprozess durchzuführen
Schreiben Sie ein Programm, das das Programm missbraucht und 100 E-Mails sendet
Python-Spezialcodec, der zu wissen scheint, aber nicht weiß
Ein Python-Skript, das auf dem Mac erstellte ._DS_Store- und ._ * -Dateien löscht
[Python] Ein Hinweis, dass ich das Verhalten von matplotlib.pyplot zu verstehen begann
[Python] Ein Programm, das den Inhalt der Liste nach links dreht
Python a + = b und a = a + b sind unterschiedlich
Python 3.6 unter Windows ... und zu Xamarin.
[Einführung in Python3 Tag 1] Programmierung und Python
Dies und das von Python-Eigenschaften
[Python] gibt A [oder / und] B zurück
Python-Protokollierung und Dump an JSON
5 Möglichkeiten zum Erstellen eines Python-Chatbots
Selen und Python zum Öffnen von Google
Python-Muster, die für die Welt freigegeben und später überprüft wurden
[Python] Ich habe einen Dekorateur gemacht, der keinen Nutzen zu haben scheint.
[Einführung in Python] Was ist der Unterschied zwischen einer Liste und einem Taple?
Mit PEP8 und PEP257 ist es nicht peinlich, Python-Codierung zu zeigen!
Ich habe eine Webanwendung in Python erstellt, die Markdown in HTML konvertiert
[Python] Ein Programm, das die Anzahl der gepaarten Socken berechnet
Ein Hinweis, mit dem Sie die Python-Umgebung von Pineapple mit pyenv ändern können
[Python] Erstellen Sie einen Linebot, um den Namen und das Alter auf das Bild zu schreiben
Migration von Python2 zu Python3 (Python2 wird als virtuelle Umgebung neu erstellt und existiert gleichzeitig)
So setzen Sie in Python ein Leerzeichen mit halber Breite vor Buchstaben und Zahlen.