[PYTHON] De Kafka à KSQL - Construction d'environnement facile avec docker

Objectif

Cet article vise à toucher Apache Kafka de manière pratique et à aider autant de personnes que possible à comprendre les avantages de Kafka. J'omettrai la mise en œuvre détaillée et le mécanisme de Kafka, et j'espère que ce sera l'occasion d'élargir l'image, par exemple quel type de traitement est possible en utilisant réellement Kafka et si ce sera une solution aux problèmes existants. ..

Introduction à Kafka

Si vous comprenez les bases de Kafka, vous pouvez l'ignorer.

Kafka a été annoncé par LinkedIn en 2011 comme une "file d'attente de messagerie distribuée". Actuellement, la page officielle de Kafka indique "Distributed Streaming Platform", mais fondamentalement, elle devrait être reconnue comme une file d'attente de messagerie.

Il présente les caractéristiques suivantes et est adopté dans divers systèmes à grande échelle en tant que plate-forme de messagerie flexible, évolutive et tolérante aux pannes.

La communauté mature fournit également des API dans divers langages et une multitude de plugins appelés Kafka Connect, offrant un environnement convivial pour les développeurs.

Terminologie Kafka et mécanique simple

Kafka utilise des termes en fonction de son rôle et a la structure générale suivante. producer-broker-consumer.png Expéditeur du message: producteur Destinataire du message: consommateur Médiateur de messages: Courtier pubsub.png Chaque message en file d'attente: Sujet Mise en file d'attente des rubriques fragmentées: partition

zookeeper.png De plus, la gestion des clusters de Kafka nécessite le démarrage de Zookeeper.

Mains sur

Jusqu'à présent, déplaçons nos mains. Cette fois, nous procéderons à des travaux pratiques dans l'environnement suivant.

macOS: 10.14 python: 3.7.4 docker: 2.1.0.5 kafka-docker: https://github.com/wurstmeister/kafka-docker KSQL: https://github.com/confluentinc/ksql

Démarrer # 1 Kafka sur Docker

# 1.1 Préparation

Tout d'abord, clonons localement kafka-docker. Créez un répertoire dans votre environnement local et clonez-le à partir de github.

mkdir ~/kafka && cd ~/kafka
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker

Puisque kafka-docker fournit docker-compose.yml, je voudrais exécuter docker-compose up -d tel quel, mais ce fichier a besoin de quelques modifications. ref) https://github.com/wurstmeister/kafka-docker#advertised-hostname Vous devez configurer l'adresse IP publiée comme décrit dans.

Remplacez l'adresse IP directement écrite comme KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100 par la variable d'environnement DOCKER_HOST_IP.

sed -i -e 's/KAFKA_ADVERTISED_HOST_NAME:.*/KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}/g' docker-compose.yml

Si vous souhaitez générer une rubrique à l'avance dans Kafka qui est démarrée ensuite, il est pratique de définir la valeur suivante. ref) https://github.com/wurstmeister/kafka-docker#automatically-create-topics Insérez ce qui suit dans la ligne suivant le KAFKA_ADVERTISED_HOST_NAME modifié que vous avez modifié précédemment.

KAFKA_CREATE_TOPICS: "topic1:3:2,topic2:3:2

C'est tout pour la préparation. Maintenant, commençons Kafka.

# 1.2 Démarrer Kafka

# .C'est une bonne idée de le définir lors du démarrage d'un shell tel que bashrc.
export DOCKER_HOST_IP=$(ipconfig getifaddr en0)

docker-compose up -d --build
docker-compose ps
#Les numéros de port peuvent varier.
#           Name                        Command               State                         Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32771->9092/tcp
# kafka-docker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

Augmentez le nombre de courtiers à trois.

docker-compose scale kafka=3
docker-compose ps
#            Name                        Command               State                         Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32771->9092/tcp
# kafka-docker_kafka_2       start-kafka.sh                   Up      0.0.0.0:32772->9092/tcp
# kafka-docker_kafka_3       start-kafka.sh                   Up      0.0.0.0:32773->9092/tcp
# kafka-docker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

# 1.3 Vérification du fonctionnement de Kafka

Maintenant, exploitons Kafka avec la CLI.

#Accès à l'intérieur du conteneur Docker
./start-kafka-shell.sh $DOCKER_HOST_IP

#Les informations sur le courtier sont sorties
bash-4.4# broker-list.sh
# 10.XXX.XXX.XXX:32772,10.XXX.XXX.XXX:32773
# 10.XXX.XXX.XXX:32771

# docker-compose.yml KAFKA_CREATE_Confirmez que le sujet spécifié dans TOPICS est généré
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic1
# topic2

#Créer un sujet
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-from-cli --partitions 3 --replication-factor 2 --bootstrap-server `broker-list.sh`
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic-from-cli
# topic1
# topic2

C'est la fin d'un simple contrôle de fonctionnement Kafka. Dans le référentiel cloné, il existe un fichier sh que vous pouvez essayer Producer et Consumer avec CLI, c'est donc une bonne idée de l'essayer également. Je pense qu'il est rare d'implémenter Producer / Consumer via CLI dans le système réel, alors créons un Producer en utilisant Python3 afin que vous puissiez envoyer un message à Topic via l'application.

# 2 Envoyer un message à Kafka-Implementation of Producer

# 2.1 Préparation

Installons la bibliothèque Kafka pour Python3. Veuillez installer chaque module manquant comme il convient.

cd ~/kafka
pip install kafka-python

Ensuite, créez les fichiers suivants. Je n'écris généralement pas Python lui-même. Il s'agit simplement d'un code de niveau de contrôle de fonctionnement.

topic1-producer.py


rom kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random

cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)

date = datetime.now().strftime("%Y/%m/%d")
messageId = datetime.now().strftime("%Y/%m/%d-%H:%M:%S:%f")

user_id = random.choice([1000, 2000, 3000])
word_id = random.randint(1,5)
word_pattern = {1: 'hello', 2: 'world', 3: 'hoge', 4: 'fuga', 5: 'hello world'}
word_count = random.randint(1,3)
word_keys = random.sample(word_pattern.keys(), word_count)

producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))

for word_type in  word_keys:
    kafka_msg = {'userId': user_id, 'messageId': messageId, 'message': {'wordId': word_type, 'word': word_pattern[word_type]}}
    producer.send('topic1', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)

# 2.2 Envoyer un message à Kafka

Utilisez deux onglets de terminal. L'un est pour vérifier les messages dans le sujet et l'autre pour envoyer des messages.

# tab1
#Lancez Kafka CLI
./start-kafka-shell.sh $DOCKER_HOST_IP

#Démarrage consommateur
# --from-Avec l'option de début, il est possible d'afficher les messages qui sont déjà arrivés au sujet.
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic1 --from-beginning --bootstrap-server `broker-list.sh`

---

# tab2
python topic1-producer.py

Lorsque vous exécutez le script Python de tab2, ce sera du côté tab1.

{"userId": 1000, "messageId": "2019/12/21-22:46:03:131468", "message": {"wordId": 2, "word": "world"}}

Vous devriez être en mesure de voir le message circuler comme ça. Si vous exécutez le script comme ci-dessous, vous verrez que le message arrive toutes les 3 secondes.

# bash
while true; do python topic1-producer.py; sleep 3s; done;

# fish
while true; python topic1-producer.py; sleep 3s; end;

# 2.3 Arrivée du message

producer.gif

# 3 Implémentation du traitement Streaming à l'aide de KSQL

Ensuite, effectuons le traitement en continu. Il n'y a rien de spécial à propos du streaming, et tout le message (événement) qui circule sans fin vers Topic est simplement appelé «streaming». KSQL est une API qui vous permet d'interroger des requêtes de type SQL pour ces événements fluides pour le filtrage et l'agrégation. Il est possible de changer les données continues des messages circulant vers Topic en une autre donnée continue (Stream) ou de données agrégées (Table), et d'utiliser ces données comme nouveau sujet pour le traitement par une autre application. Veuillez vous référer au lien ci-dessous pour plus de détails.

ref) https://kafka.apache.org/documentation/streams/ ref) https://www.youtube.com/watch?v=DPGn-j7yD68

Stream et Table sont fondamentalement (24/7) toujours en cours d'exécution, donc si vous reconnaissez qu'ils sont traités de la même manière que Topic, il sera plus facile d'entrer.

# 3.1 Préparation

Tout d'abord, préparez KSQL développé par confluent.

cd ~/kafka
git clone https://github.com/confluentinc/ksql.git
cd ksql

# 3.2 Démarrage du serveur KSQL / CLI KSQL

# kafka-Revenir au répertoire docker
cd ../kafka-docker
#adresse IP en cours d'exécution de kafka+Obtenir le numéro de port
export KSQL_BOOTSTRAP_SERVERS=(docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*' |sort |uniq |tr '\n' ',')
#Déplacer vers le répertoire ksql
cd ../ksql
#démarrer le serveur ksql
docker run -d -p $DOCKER_HOST_IP:8088:8088 \
                 -e KSQL_BOOTSTRAP_SERVERS=$KSQL_BOOTSTRAP_SERVERS \
                 -e KSQL_OPTS="-Dksql.service.id=ksql_service_3_  -Dlisteners=http://0.0.0.0:8088/" \
                 confluentinc/cp-ksql-server:5.3.1
#confirmation du processus docker
docker ps
# confluentinc/cp-ksql-server:5.3.1 conteneur fonctionne

#Démarrez KSQL CLI
docker run -it confluentinc/cp-ksql-cli http://$DOCKER_HOST_IP:8088

Si le démarrage de KSQL CLI réussit, la CLI suivante sera lancée. ksql.png

# 3.3 Créer un flux

Ici, créons un flux et une table pour le traitement en continu à partir des sujets 1 et 2.

ksql> show streams;
#  Stream Name | Kafka Topic | Format
# ------------------------------------
# ------------------------------------

ksql> CREATE STREAM topic1_stream1 (userId INT, messageId VARCHAR, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
#  Stream Name    | Kafka Topic | Format
# ---------------------------------------
#  TOPIC1_STREAM1 | topic1      | JSON
# ---------------------------------------

ksql> CREATE TABLE topic1_table1 (userId INT, wordCount INT, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show tables;
#  Table Name    | Kafka Topic | Format | Windowed
# -------------------------------------------------
#  TOPIC1_TABLE1 | topic1      | JSON   | false
# -------------------------------------------------

※important Il existe certaines restrictions lors de la création d'un flux et d'une table. Il m'a fallu beaucoup d'essais et d'erreurs pour apprendre moi-même cette règle. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html

from Topic from stream from stream-stream from table-table from stream-table
CREATE Stream o o o x o
CREATE Table o o x o x

Comme SQL, utilisez la syntaxe JOIN pour créer de nouveaux flux et tables à partir de deux ressources. Il convient de noter ici que «JOIN» n'est possible qu'avec la valeur définie dans la KEY de chaque ressource. En d'autres termes, dans l'exemple ci-dessus, vous ne pouvez pas JOIN avec deux colonnes dans un Stream créé à partir de topic1 et un Stream créé à partir d'une autre rubrique. (Exemple: un événement avec userId = 2000 et wordCount = 5 ne peut pas être un nouveau flux.)

Si vous souhaitez «JOIN» avec plusieurs colonnes, vous pouvez le gérer en préparant une colonne qui les combine dans le message Topic et en le définissant comme «KEY». (Exemple: KEY => $ {userId} - $ {wordCount})

De plus, la cible doit être «KEY» pour faire «GROUP BY» dans la requête à Table.

# 3.4 Requête pour diffuser

Les requêtes à diffuser sont toujours interrogées pour les messages mis à jour. En d'autres termes, les messages emballés dans Topic avant le moment où la requête est lancée ne seront pas générés à la suite de l'interrogation de Stream. Comme mentionné au début de ce chapitre, les Streams et les Tables sont toujours en cours d'exécution et sont créés à l'avance comme Topic. Lorsque vous touchez KSQL pour la première fois, vous n'en avez peut-être pas conscience et vous vous posez peut-être la question "À quoi sert-il? Quand est-il utilisé?" Dans un système réel, je pense que le traitement Stream est rarement effectué via CLI, mais comme il est pratique, il est possible de vérifier le résultat de la requête même pour les messages déjà dans le sujet comme indiqué ci-dessous avec la signification du débogage. Définissons les valeurs suivantes dans la CLI KSQL.

ksql> SET 'auto.offset.reset'='earliest';
#Obtenez tous les événements dans Stream
ksql> select * from topic1_stream1;
# 1576936834754 | 2019/12/21 | 3000 | 2019/12/21-23:00:34:614230 | {WORD=fuga, WORDID=4}
# 1576936837399 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hello world, WORDID=5}
# 1576936837512 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hoge, WORDID=3}
---
#Combien de messages chaque utilisateur a envoyés en même temps dans Stream
ksql> select userId, count(messageId) from topic1_stream1 group by userId,  messageId;
# 1000 | 3
# 3000 | 2
# 3000 | 1

En plus des fonctions d'agrégation fournies par défaut dans KSQL, vous pouvez également utiliser celles définies par le développeur. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#aggregate-functions

De plus, les documents suivants seront très utiles pour l'agrégation. Un très large éventail de requêtes est possible, comme la possibilité de séparer des événements à des moments précis et de les agréger. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/aggregate-streaming-data.html#aggregate-streaming-data-with-ksql

Dans cet état, essayez d'envoyer un message à topic1 en suivant la procédure dans # 2.2. Stream.gif

#3.4 Stream + Stream => Stream => Table Enfin, en tant que version avancée, créons un nouveau Stream à partir de deux Stream et interrogons-le pour créer une Table.

À titre d'exemple, supposons une scène dans laquelle un utilisateur est sélectionné au hasard par loterie et les mots-clés que l'utilisateur a prononcés au cours des 60 dernières minutes sont extraits. (Veuillez me pardonner car je n'ai pas trouvé un bon exemple ;;)

Copions d'abord topic1-producteur.py et créons topic2-producteur.py

cp topic{1,2}-producer.py

topic2-producer.py


from kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random

cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)

date = datetime.now().strftime("%Y/%m/%d")

user_id = random.choice([1000, 2000, 3000])
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
kafka_msg = {'userId': user_id}
producer.send('topic2', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)

Après avoir créé le fichier comme ci-dessus, créons un Stream à partir de Topic1 et Topic2 avec ʻuserId` comme clé.

ksql> CREATE STREAM topic2_stream1 (userId INTEGER) WITH (KAFKA_TOPIC = 'topic2', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
#  Stream Name    | Kafka Topic | Format
# ---------------------------------------
#  TOPIC2_STREAM1 | topic2      | JSON
#  TOPIC1_STREAM1 | topic1      | JSON
# ---------------------------------------

Ensuite, créez un nouveau Stream à partir de l 'ʻuserId` correspondant des deux Streams. Puisque le déclencheur est qu'un nouveau message (événement) arrive à Topic2, Topic2 devient le Stream sur le côté GAUCHE. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html#semantics-of-stream-stream-joins

# Topic2 Stream + Topic1 Stream => New Stream
ksql> CREATE STREAM topic1_topic2_stream1 AS SELECT t2s1.userId as userId, t1s1.messageId, t1s1.message FROM topic2_stream1 t2s1 INNER JOIN topic1_stream1 t1s1 WITHIN 1 HOURS ON t2s1.userId = t1s1.userId;

# 3.SET à 4'auto.offset.reset'='earliest';Si vous avez fait cela, utilisez la commande ci-dessous pour restaurer la valeur par défaut afin que seules les modifications soient les résultats de la requête.
ksql> SET 'auto.offset.reset'='latest';

ksql> select * from topic1_topic2_stream1;

Dans cet état, essayez d'exécuter topic2-Producer.py à partir d'un autre onglet. Une fois exécuté, le message (événement) arrivé à topic1_stream1 au cours de la dernière heure sera affiché comme indiqué ci-dessous. StreamJoin.gif

Enfin, créons une table à partir de la requête pour Stream de topic1_topic2_stream1.

#Créer une table de la requête au flux
ksql> CREATE TABLE topic1_topic2_table1 AS SELECT userId, COLLECT_SET(message->word) as word FROM topic1_topic2_stream1 GROUP BY userId;

#Si vous exécutez la requête suivante lors de l'envoi d'un message à Topic2, vous pouvez voir comment un nouveau message (événement) est créé.
ksql> select * from topic1_topic2_table1;
# 1576940945888 | 2019/12/22 | 1000 | [hello, hello world, fuga, hoge, world]
# 1576941043356 | 2019/12/22 | 3000 | [hello, hello world, fuga]

C'est la fin du contenu pratique.

Recommended Posts

De Kafka à KSQL - Construction d'environnement facile avec docker
De la construction de l'environnement au déploiement pour flask + Heroku avec Docker
De la construction d'environnement Python à la construction d'environnement virtuel avec anaconda
Construction d'un environnement Jupyter facile avec Cloud9
De Ubuntu 20.04 introduction à la construction d'environnement
Construisez un environnement Python + bouteille + MySQL avec Docker sur RaspberryPi3! [Construction facile]
Réaliser la construction d'environnement pour "Deep Learning from scratch" avec docker et Vagrant
Collecter des informations depuis Twitter avec Python (construction de l'environnement)
De la création de l'environnement Kivy à l'affichage de Hello World
Impossible de se connecter à MySQL depuis l'environnement Docker (Debian)
De 0 à la construction de l'environnement de développement Django à l'exploitation de base
Construction de l'environnement: GCP + Docker
Construction d'un environnement d'analyse de données Python facile avec Windows10 Pro x VS Code x Docker
Construction d'environnement virtuel avec Docker + Flask (Python) + notebook Jupyter
[Python] Construction de l'environnement OpenCV avec Docker (cv2.imshow () fonctionne également)
Construction de l'environnement de contrôle à distance Pepper-kun avec Docker + IPython Notebook
Construction de l'environnement de développement Python 2020 [De l'installation de Python à l'introduction à la poésie]
Construction d'environnement avec anyenv + pyenv (migrer depuis pyenv uniquement (Mac))
Exemple d'environnement pytest pour réparer la base de données avec Docker
Procédure pour convertir un fichier python en exe à partir de la construction de l'environnement Ubunts
Environnement toxique facile avec Jenkins
Rendre avec la syntaxe facile
Construction de l'environnement Docker + Django + React
Préparer l'environnement python3 avec Docker
OpenJTalk sur Windows10 (parler japonais avec Python depuis la construction de l'environnement)
Créez un environnement pour "Deep Learning from scratch" avec Docker
Commencez avec Python! ~ ① Construction de l'environnement ~
Easy Slackbot avec Docker et Errbot
Construction de l'environnement Ruby avec AWS EC2
Construire un environnement Mysql + Python avec docker
Comment utiliser Jupyter Notebook sans polluer votre environnement avec Docker
Installez facilement pyspark avec conda
[Linux] Construction de l'environnement Docker sur Redhat
Automatisez la construction d'environnement avec ShellScript
Construction de l'environnement Python3 avec pyenv-virtualenv (CentOS 7.3)
Construction d'environnement Postgres avec Docker J'ai eu un peu de mal, alors notez
Utilisation de Chainer avec CentOS7 [Construction de l'environnement]
Dessinez facilement des graphiques avec matplotlib
Construction de l'environnement pytorch @ python3.8 avec pipenv
De l'installation ubuntu à l'exécution de kinect avec docker et ros (présentation)
De la construction de l'environnement PyCUDA à la programmation GPGPU sur Mac (MacOS 10.12 Sierra)
Reconstruisez l'environnement de développement de Django avec Docker! !! !! !!
[docker] Construction de l'environnement python3.5 + numpy + matplotlib
Environnement de déploiement facile avec gaffer + tissu
[Google App Engine] Flux de la construction de l'environnement de développement à la création de l'application
Créer un environnement avec pyenv et pyenv-virtualenv
Tutoriel d'apprentissage en profondeur de la construction d'environnement
Je voulais utiliser le notebook jupyter avec docker dans l'environnement pip (opticspy)
Préparer un environnement pour toucher les fichiers au format grib2 avec python (édition Docker)
Maintenance de l'environnement réalisée avec Docker (je souhaite post-traiter GrADS en Python
[Ubuntu 18.04] Créer un environnement Python avec pyenv + pipenv
Créer un environnement Jupyter Lab (Python) avec Docker
[Note] [PyTorch] De l'installation à la simplicité d'utilisation
Vue.js + Mémorandum de construction de l'environnement Flask ~ avec Anaconda3 ~
Comment créer un environnement NVIDIA Docker
[Python] Road to snakes (1) Construction de l'environnement