Dieser Artikel zielt darauf ab, Apache Kafka durch praktische Übungen zu berühren und so vielen Menschen wie möglich zu helfen, die Vorteile von Kafka zu verstehen. Ich werde die detaillierte Implementierung und den Mechanismus von Kafka weglassen und hoffe, dass dies eine Gelegenheit ist, das Image zu erweitern, z. B. welche Art der Verarbeitung durch die tatsächliche Verwendung von Kafka möglich ist und ob dies eine Lösung für bestehende Probleme darstellt. ..
Wenn Sie die Grundlagen von Kafka verstehen, können Sie diese überspringen.
Kafka wurde 2011 von LinkedIn als "Distributed Messaging Queue" angekündigt. Derzeit heißt es auf der offiziellen Seite von Kafka "Distributed Streaming Platform", aber im Grunde sollte es als Messaging-Warteschlange erkannt werden.
Es verfügt über die folgenden Funktionen und wird in verschiedenen großen Systemen als flexible, skalierbare und fehlertolerante Messaging-Plattform eingesetzt.
Die ausgereifte Community bietet auch APIs in verschiedenen Sprachen und eine Vielzahl von Plugins namens Kafka Connect, die eine entwicklerfreundliche Umgebung bieten.
Kafka verwendet Begriffe entsprechend seiner Rolle und setzt sich grob wie folgt zusammen. Absender der Nachricht: Produzent Nachrichtenempfänger: Verbraucher Nachrichtenvermittler: Broker Jede Nachrichtenwarteschlange: Thema Warteschlange für Sharded-Themen: Partition
Für die Clusterverwaltung von Kafka muss außerdem Zookeeper gestartet werden.Lassen Sie uns bisher tatsächlich unsere Hände bewegen. Dieses Mal werden wir in der folgenden Umgebung mit der praktischen Arbeit fortfahren.
macOS: 10.14 python: 3.7.4 docker: 2.1.0.5 kafka-docker: https://github.com/wurstmeister/kafka-docker KSQL: https://github.com/confluentinc/ksql
Lassen Sie uns zuerst Kafka-Docker lokal klonen. Erstellen Sie ein Verzeichnis in Ihrer lokalen Umgebung und klonen Sie es von github.
mkdir ~/kafka && cd ~/kafka
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker
Da kafka-docker docker-compose.yml bereitstellt, möchte ich "docker-compose up -d" so wie es ist ausführen, aber diese Datei muss geändert werden. ref) https://github.com/wurstmeister/kafka-docker#advertised-hostname Sie müssen die angekündigte IP wie unter beschrieben konfigurieren.
Ändern Sie die IP-Adresse, die direkt als "KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100" geschrieben wurde, in die Umgebungsvariable "DOCKER_HOST_IP".
sed -i -e 's/KAFKA_ADVERTISED_HOST_NAME:.*/KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}/g' docker-compose.yml
Wenn Sie im Voraus ein Thema in Kafka generieren möchten, das als Nächstes gestartet wird, können Sie den folgenden Wert festlegen.
ref) https://github.com/wurstmeister/kafka-docker#automatically-create-topics
Fügen Sie Folgendes in die Zeile nach dem geänderten KAFKA_ADVERTISED_HOST_NAME
ein, den Sie zuvor geändert haben.
KAFKA_CREATE_TOPICS: "topic1:3:2,topic2:3:2
Das ist alles zur Vorbereitung. Jetzt fangen wir mit Kafka an.
# .Es ist eine gute Idee, es festzulegen, wenn Sie eine Shell wie bashrc starten.
export DOCKER_HOST_IP=$(ipconfig getifaddr en0)
docker-compose up -d --build
docker-compose ps
#Die Portnummern können variieren.
# Name Command State Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:32771->9092/tcp
# kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Erhöhen Sie die Anzahl der Broker auf drei.
docker-compose scale kafka=3
docker-compose ps
# Name Command State Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1 start-kafka.sh Up 0.0.0.0:32771->9092/tcp
# kafka-docker_kafka_2 start-kafka.sh Up 0.0.0.0:32772->9092/tcp
# kafka-docker_kafka_3 start-kafka.sh Up 0.0.0.0:32773->9092/tcp
# kafka-docker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
Lassen Sie uns nun Kafka tatsächlich mit der CLI betreiben.
#Zugriff im Docker-Container
./start-kafka-shell.sh $DOCKER_HOST_IP
#Brokerinformationen werden ausgegeben
bash-4.4# broker-list.sh
# 10.XXX.XXX.XXX:32772,10.XXX.XXX.XXX:32773
# 10.XXX.XXX.XXX:32771
# docker-compose.yml KAFKA_CREATE_Stellen Sie sicher, dass das in THEMEN angegebene Thema generiert wird
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic1
# topic2
#Thema erstellen
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-from-cli --partitions 3 --replication-factor 2 --bootstrap-server `broker-list.sh`
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic-from-cli
# topic1
# topic2
Dies ist das Ende einer einfachen Kafka-Funktionsprüfung. Im geklonten Repository befindet sich eine sh-Datei, mit der Sie Producer and Consumer mit CLI testen können. Es ist daher eine gute Idee, dies ebenfalls zu versuchen. Ich denke, dass es selten vorkommt, Producer / Consumer über CLI im eigentlichen System zu implementieren. Erstellen wir also einen Producer mit Python3, damit Sie über die App eine Nachricht an Topic senden können.
Lassen Sie uns die Kafka-Bibliothek für Python3 installieren. Bitte installieren Sie jedes fehlende Modul entsprechend.
cd ~/kafka
pip install kafka-python
Erstellen Sie als Nächstes die folgenden Dateien. Normalerweise schreibe ich Python nicht selbst. Es ist nur ein Code für die Funktionsprüfung.
topic1-producer.py
rom kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random
cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)
date = datetime.now().strftime("%Y/%m/%d")
messageId = datetime.now().strftime("%Y/%m/%d-%H:%M:%S:%f")
user_id = random.choice([1000, 2000, 3000])
word_id = random.randint(1,5)
word_pattern = {1: 'hello', 2: 'world', 3: 'hoge', 4: 'fuga', 5: 'hello world'}
word_count = random.randint(1,3)
word_keys = random.sample(word_pattern.keys(), word_count)
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
for word_type in word_keys:
kafka_msg = {'userId': user_id, 'messageId': messageId, 'message': {'wordId': word_type, 'word': word_pattern[word_type]}}
producer.send('topic1', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)
Verwenden Sie zwei Anschlusslaschen. Eine dient zum Überprüfen von Nachrichten im Thema und die andere zum Senden von Nachrichten.
# tab1
#Starten Sie Kafka CLI
./start-kafka-shell.sh $DOCKER_HOST_IP
#Consumer Startup
# --from-Mit der Startoption ist es möglich, Nachrichten anzuzeigen, die bereits bei Topic angekommen sind.
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic1 --from-beginning --bootstrap-server `broker-list.sh`
---
# tab2
python topic1-producer.py
Wenn Sie das Python-Skript von tab2 ausführen, befindet es sich auf der Seite von tab1.
{"userId": 1000, "messageId": "2019/12/21-22:46:03:131468", "message": {"wordId": 2, "word": "world"}}
Sie sollten in der Lage sein, die Nachricht so fließen zu sehen. Wenn Sie das Skript wie folgt ausführen, wird die Nachricht alle 3 Sekunden eingehen.
# bash
while true; do python topic1-producer.py; sleep 3s; done;
# fish
while true; python topic1-producer.py; sleep 3s; end;
Als nächstes führen wir die Streaming-Verarbeitung durch. Streaming ist nichts Besonderes, und die gesamte Nachricht (Ereignis), die endlos zum Thema fließt, wird einfach als "Streaming" bezeichnet. KSQL ist eine API, mit der Sie SQL-ähnliche Abfragen nach diesen fließenden Ereignissen zum Filtern und Aggregieren abfragen können. Es ist möglich, die fortlaufenden Daten von Nachrichten, die zum Thema fließen, in andere fortlaufende Daten (Stream) oder aggregierte Daten (Tabelle) zu ändern und diese Daten als neues Thema für die Verarbeitung durch eine andere Anwendung zu verwenden. Weitere Informationen finden Sie unter dem folgenden Link.
ref) https://kafka.apache.org/documentation/streams/ ref) https://www.youtube.com/watch?v=DPGn-j7yD68
Stream und Tabelle werden grundsätzlich (rund um die Uhr) immer ausgeführt. Wenn Sie also feststellen, dass sie wie Thema behandelt werden, ist die Eingabe einfacher.
Bereiten Sie zunächst KSQL vor, das von Confluent entwickelt wurde.
cd ~/kafka
git clone https://github.com/confluentinc/ksql.git
cd ksql
# kafka-Kehren Sie zum Docker-Verzeichnis zurück
cd ../kafka-docker
#laufende IP-Adresse von kafka+Portnummer abrufen
export KSQL_BOOTSTRAP_SERVERS=(docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*' |sort |uniq |tr '\n' ',')
#Wechseln Sie in das ksql-Verzeichnis
cd ../ksql
#Starten Sie den ksql-Server
docker run -d -p $DOCKER_HOST_IP:8088:8088 \
-e KSQL_BOOTSTRAP_SERVERS=$KSQL_BOOTSTRAP_SERVERS \
-e KSQL_OPTS="-Dksql.service.id=ksql_service_3_ -Dlisteners=http://0.0.0.0:8088/" \
confluentinc/cp-ksql-server:5.3.1
#Docker-Prozessbestätigung
docker ps
# confluentinc/cp-ksql-server:5.3.1 Container läuft
#Starten Sie KSQL CLI
docker run -it confluentinc/cp-ksql-cli http://$DOCKER_HOST_IP:8088
Wenn der Start der KSQL-CLI erfolgreich ist, wird die folgende CLI gestartet.
Hier erstellen wir einen Stream und eine Tabelle für die Streaming-Verarbeitung aus den Themen 1 und 2.
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ------------------------------------
# ------------------------------------
ksql> CREATE STREAM topic1_stream1 (userId INT, messageId VARCHAR, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ---------------------------------------
# TOPIC1_STREAM1 | topic1 | JSON
# ---------------------------------------
ksql> CREATE TABLE topic1_table1 (userId INT, wordCount INT, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show tables;
# Table Name | Kafka Topic | Format | Windowed
# -------------------------------------------------
# TOPIC1_TABLE1 | topic1 | JSON | false
# -------------------------------------------------
※wichtig Beim Erstellen von Stream und Tabelle gibt es einige Einschränkungen. Ich habe viel Versuch und Irrtum gebraucht, um diese Regel selbst zu lernen. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html
from Topic | from stream | from stream-stream | from table-table | from stream-table | |
---|---|---|---|---|---|
CREATE Stream | o | o | o | x | o |
CREATE Table | o | o | x | o | x |
Verwenden Sie wie SQL die Syntax "JOIN", um neue Streams und Tabellen aus zwei Ressourcen zu erstellen. Hierbei ist zu beachten, dass "JOIN" nur mit dem im KEY jeder Ressource festgelegten Wert möglich ist. Mit anderen Worten, im obigen Beispiel können Sie nicht mit zwei Spalten in einem aus Thema1 erstellten Stream und einem aus einem anderen Thema erstellten Stream "VERBINDEN". (Beispiel: Ein Ereignis mit userId = 2000 und wordCount = 5 kann kein neuer Stream sein.)
Wenn Sie mit mehreren Spalten "VERBINDEN" möchten, können Sie dies tun, indem Sie eine Spalte vorbereiten, die sie in der Themennachricht kombiniert, und sie als "SCHLÜSSEL" festlegen. (Beispiel: KEY => $ {userId} - $ {wordCount}
)
Außerdem muss das Ziel "KEY" sein, um "GROUP BY" in der Abfrage nach Tabelle auszuführen.
Abfragen an Stream werden immer nach aktualisierten Nachrichten abgefragt. Mit anderen Worten, Nachrichten, die vor dem Auslösen der Abfrage in Topic gepackt wurden, werden als Ergebnis der Abfrage von Stream nicht ausgegeben. Wie am Anfang dieses Kapitels erwähnt, werden Streams und Tabellen immer ausgeführt und wie Topic im Voraus erstellt. Wenn Sie KSQL zum ersten Mal berühren, sind Sie sich dessen möglicherweise nicht bewusst, und es bleibt möglicherweise die Frage: "Wofür wird es verwendet? Wann wird es verwendet?" In einem tatsächlichen System wird die Stream-Verarbeitung meiner Meinung nach selten über die CLI ausgeführt. Da sie jedoch praktisch ist, ist es möglich, das Abfrageergebnis auch für Nachrichten zu überprüfen, die sich bereits im Thema befinden, wie unten gezeigt, mit der Bedeutung des Debuggens. Lassen Sie uns die folgenden Werte in der KSQL-CLI festlegen.
ksql> SET 'auto.offset.reset'='earliest';
#Holen Sie sich alle Ereignisse in Stream
ksql> select * from topic1_stream1;
# 1576936834754 | 2019/12/21 | 3000 | 2019/12/21-23:00:34:614230 | {WORD=fuga, WORDID=4}
# 1576936837399 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hello world, WORDID=5}
# 1576936837512 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hoge, WORDID=3}
---
#Wie viele Nachrichten hat jeder Benutzer gleichzeitig in Stream gesendet?
ksql> select userId, count(messageId) from topic1_stream1 group by userId, messageId;
# 1000 | 3
# 3000 | 2
# 3000 | 1
Zusätzlich zu den in KSQL standardmäßig bereitgestellten Aggregatfunktionen können Sie auch vom Entwickler definierte verwenden. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#aggregate-functions
Darüber hinaus sind die folgenden Dokumente für die Aggregation sehr hilfreich. Es ist eine Vielzahl von Abfragen möglich, z. B. die Möglichkeit, Ereignisse zu bestimmten Zeiten zu trennen und zu aggregieren. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/aggregate-streaming-data.html#aggregate-streaming-data-with-ksql
Versuchen Sie in diesem Zustand, eine Nachricht an topic1 zu senden, indem Sie den Anweisungen in # 2.2 folgen.
#3.4 Stream + Stream => Stream => Table Als erweiterte Version erstellen wir schließlich einen neuen Stream aus zwei Streams und fragen ihn ab, um eine Tabelle zu erstellen.
Nehmen wir als Beispiel eine Szene an, in der ein Benutzer per Lotterie zufällig ausgewählt wird und die Schlüsselwörter, die der Benutzer in den letzten 60 Minuten gesprochen hat, extrahiert werden. (Bitte vergib mir, weil ich kein gutes Beispiel gefunden habe ;;)
Zuerst kopieren wir topic1-Producer.py
und erstellen topic2-Producer.py
.
cp topic{1,2}-producer.py
topic2-producer.py
from kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random
cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)
date = datetime.now().strftime("%Y/%m/%d")
user_id = random.choice([1000, 2000, 3000])
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
kafka_msg = {'userId': user_id}
producer.send('topic2', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)
Nachdem Sie die Datei wie oben erstellt haben, erstellen wir einen Stream mit userId
als KEY aus Topic1 und Topic2.
ksql> CREATE STREAM topic2_stream1 (userId INTEGER) WITH (KAFKA_TOPIC = 'topic2', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
# Stream Name | Kafka Topic | Format
# ---------------------------------------
# TOPIC2_STREAM1 | topic2 | JSON
# TOPIC1_STREAM1 | topic1 | JSON
# ---------------------------------------
Erstellen Sie dann einen neuen Stream aus der übereinstimmenden Benutzer-ID aus den beiden Streams. Da der Auslöser darin besteht, dass eine neue Nachricht (ein neues Ereignis) bei Topic2 eintrifft, wird Topic2 zum Stream auf der linken Seite. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html#semantics-of-stream-stream-joins
# Topic2 Stream + Topic1 Stream => New Stream
ksql> CREATE STREAM topic1_topic2_stream1 AS SELECT t2s1.userId as userId, t1s1.messageId, t1s1.message FROM topic2_stream1 t2s1 INNER JOIN topic1_stream1 t1s1 WITHIN 1 HOURS ON t2s1.userId = t1s1.userId;
# 3.SET bei 4'auto.offset.reset'='earliest';Wenn Sie dies getan haben, verwenden Sie den folgenden Befehl, um den Standard wiederherzustellen, sodass nur die Änderungen die Abfrageergebnisse sind.
ksql> SET 'auto.offset.reset'='latest';
ksql> select * from topic1_topic2_stream1;
Versuchen Sie in diesem Zustand, topic2-Producer.py
von einer anderen Registerkarte aus auszuführen.
Bei der Ausführung wird die Nachricht (Ereignis) angezeigt, die in der letzten Stunde bei "topic1_stream1" angekommen ist, wie unten gezeigt.
Zuletzt erstellen wir eine Tabelle aus der Abfrage für Stream von topic1_topic2_stream1
.
#Tabelle von Abfrage zu Stream erstellen
ksql> CREATE TABLE topic1_topic2_table1 AS SELECT userId, COLLECT_SET(message->word) as word FROM topic1_topic2_stream1 GROUP BY userId;
#Wenn Sie die folgende Abfrage ausführen, während Sie eine Nachricht an Topic2 senden, können Sie sehen, wie eine neue Nachricht (Ereignis) erstellt wird.
ksql> select * from topic1_topic2_table1;
# 1576940945888 | 2019/12/22 | 1000 | [hello, hello world, fuga, hoge, world]
# 1576941043356 | 2019/12/22 | 3000 | [hello, hello world, fuga]
Dies ist das Ende des praktischen Inhalts.