[PYTHON] From Kafka to KSQL --Easy environment construction with docker

Purpose

This article aims to give you hands-on experience with Apache Kafka and help as many people as possible understand the benefits of Kafka. I will omit the detailed implementation and mechanism of Kafka, and I hope it will be an opportunity to expand the image such as what kind of processing is possible by actually using Kafka and whether it will be a solution to existing problems. ..

Introduction to Kafka

If you understand the basics of Kafka, you can skip it.

Kafka was announced by LinkedIn in 2011 as a "Distributed Messaging Queue". Currently, Kafka's official page says "Distributed Streaming Platform", but basically it should be recognized as a messaging queue.

It has the following features and is used in various large-scale systems as a flexible, scalable, and fault-tolerant messaging platform.

-** Pub / Sub model ** => Multiple apps can receive the same message (flexible and scalable) -** Cluster configuration with multi-broker ** => Increase servers by message volume to achieve high throughput -** Persistence of message data by saving to disk ** => Message can be reprocessed by reloading the same message

In addition, the mature community provides APIs in various languages and a wealth of plugins called Kafka Connect, providing a developer-friendly environment.

Kafka terminology and simple mechanics

Kafka uses terms according to its role, and is roughly composed as follows. producer-broker-consumer.png Message sender: Producer Message receiver: Consumer Message broker: Broker pubsub.png Each message queuing: Topic Sharded Topic Queue Queuing: Partition

zookeeper.png In addition, Zookeeper must be started for Kafka cluster management.

Hands-on

So far, let's actually move our hands. This time, we will proceed with hands-on in the following environment.

macOS: 10.14 python: 3.7.4 docker: 2.1.0.5 kafka-docker: https://github.com/wurstmeister/kafka-docker KSQL: https://github.com/confluentinc/ksql

# 1 Start Kafka on docker

# 1.1 Preparation

First, let's clone kafka-docker locally. Create a directory in your local environment and clone it from github.

mkdir ~/kafka && cd ~/kafka
git clone https://github.com/wurstmeister/kafka-docker.git
cd kafka-docker

Since docker-compose.yml is provided by kafka-docker, I would like to execute docker-compose up -d as it is, but this file needs some modification. ref) https://github.com/wurstmeister/kafka-docker#advertised-hostname You need to configure the advertised ip as described in.

Change the IP address directly written as KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100 to the environment variable DOCKER_HOST_IP.

sed -i -e 's/KAFKA_ADVERTISED_HOST_NAME:.*/KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP}/g' docker-compose.yml

If you want to generate Topic in advance in Kafka that is started next, it is convenient to set the following value. ref) https://github.com/wurstmeister/kafka-docker#automatically-create-topics Insert the following in the line following the modified KAFKA_ADVERTISED_HOST_NAME that you modified earlier.

KAFKA_CREATE_TOPICS: "topic1:3:2,topic2:3:2

That's all for preparation. Now let's start Kafka.

# 1.2 Start Kafka

# .It is good to set it when starting shell such as bashrc
export DOCKER_HOST_IP=$(ipconfig getifaddr en0)

docker-compose up -d --build
docker-compose ps
#The port numbers can be different.
#           Name                        Command               State                         Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32771->9092/tcp
# kafka-docker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

Increase the number of brokers to three.

docker-compose scale kafka=3
docker-compose ps
#            Name                        Command               State                         Ports
# ----------------------------------------------------------------------------------------------------------------------
# kafka-docker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32771->9092/tcp
# kafka-docker_kafka_2       start-kafka.sh                   Up      0.0.0.0:32772->9092/tcp
# kafka-docker_kafka_3       start-kafka.sh                   Up      0.0.0.0:32773->9092/tcp
# kafka-docker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:2181->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

# 1.3 Kafka operation check

Now, let's actually operate Kafka with the CLI.

#Access inside docker container
./start-kafka-shell.sh $DOCKER_HOST_IP

#Broker information is output
bash-4.4# broker-list.sh
# 10.XXX.XXX.XXX:32772,10.XXX.XXX.XXX:32773
# 10.XXX.XXX.XXX:32771

# docker-compose.yml KAFKA_CREATE_Confirm that the Topic specified in TOPICS is generated
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic1
# topic2

#Creating Topic
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic-from-cli --partitions 3 --replication-factor 2 --bootstrap-server `broker-list.sh`
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server `broker-list.sh`
# topic-from-cli
# topic1
# topic2

This is the end of a simple Kafka operation check. The cloned repository has an sh file that you can try Producer and Consumer with CLI, so you should try that as well. I think that it is rare to implement Producer / Consumer via CLI in the actual system, so let's create a Producer using Python3 so that you can send a message to Topic via the application.

# 2 Send a message to Kafka-Producer implementation

# 2.1 Preparation

Let's install the Kafka library for Python3. Please install each missing module as appropriate.

cd ~/kafka
pip install kafka-python

Next, create the following files. I don't usually write Python itself. It is just an operation check level code.

topic1-producer.py


rom kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random

cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)

date = datetime.now().strftime("%Y/%m/%d")
messageId = datetime.now().strftime("%Y/%m/%d-%H:%M:%S:%f")

user_id = random.choice([1000, 2000, 3000])
word_id = random.randint(1,5)
word_pattern = {1: 'hello', 2: 'world', 3: 'hoge', 4: 'fuga', 5: 'hello world'}
word_count = random.randint(1,3)
word_keys = random.sample(word_pattern.keys(), word_count)

producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))

for word_type in  word_keys:
    kafka_msg = {'userId': user_id, 'messageId': messageId, 'message': {'wordId': word_type, 'word': word_pattern[word_type]}}
    producer.send('topic1', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)

# 2.2 Send a message to Kafka

Use two terminal tabs. One is for checking messages in the topic and the other is for sending messages.

# tab1
#Launch Kafka CLI
./start-kafka-shell.sh $DOCKER_HOST_IP

#Consumer startup
# --from-With the beginning option, it is possible to display messages that have already arrived at Topic.
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic1 --from-beginning --bootstrap-server `broker-list.sh`

---

# tab2
python topic1-producer.py

When you execute the Python script of tab2, it will be on the tab1 side.

{"userId": 1000, "messageId": "2019/12/21-22:46:03:131468", "message": {"wordId": 2, "word": "world"}}

You should be able to see the message flowing like this. If you run the script as below, you will see that the message arrives every 3 seconds.

# bash
while true; do python topic1-producer.py; sleep 3s; done;

# fish
while true; python topic1-producer.py; sleep 3s; end;

# 2.3 Message arrival

producer.gif

# 3 Implementation of Streaming processing using KSQL

Next, let's perform streaming processing. There is nothing special about streaming, and the entire message (event) that flows endlessly to Topic is simply called "streaming". KSQL is an API that allows you to query and filter and aggregate those events in a SQL-like manner. It is possible to change the continuous data of the messages flowing to Topic into another continuous data (Stream) or aggregated data (Table), and use that data as a new topic for processing by another application. Please refer to the link below for details.

ref) https://kafka.apache.org/documentation/streams/ ref) https://www.youtube.com/watch?v=DPGn-j7yD68

Stream and Table are basically (24/7) always running, so if you recognize that they are treated the same as Topic, it will be easier to enter.

# 3.1 Preparation

First, prepare KSQL developed by confluent.

cd ~/kafka
git clone https://github.com/confluentinc/ksql.git
cd ksql

# 3.2 Starting KSQL server / KSQL CLI

# kafka-Return to docker directory
cd ../kafka-docker
#running IP address of kafka+Get Port number
export KSQL_BOOTSTRAP_SERVERS=(docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*' |sort |uniq |tr '\n' ',')
#Move to ksql directory
cd ../ksql
#start ksql server
docker run -d -p $DOCKER_HOST_IP:8088:8088 \
                 -e KSQL_BOOTSTRAP_SERVERS=$KSQL_BOOTSTRAP_SERVERS \
                 -e KSQL_OPTS="-Dksql.service.id=ksql_service_3_  -Dlisteners=http://0.0.0.0:8088/" \
                 confluentinc/cp-ksql-server:5.3.1
#docker process confirmation
docker ps
# confluentinc/cp-ksql-server:5.3.1 container is running

#Start KSQL CLI
docker run -it confluentinc/cp-ksql-cli http://$DOCKER_HOST_IP:8088

If the CLI startup of KSQL is successful, the CLI shown below will be launched. ksql.png

# 3.3 Creating a Stream

Here, let's create a stream and table for streaming processing from topics 1 and 2.

ksql> show streams;
#  Stream Name | Kafka Topic | Format
# ------------------------------------
# ------------------------------------

ksql> CREATE STREAM topic1_stream1 (userId INT, messageId VARCHAR, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
#  Stream Name    | Kafka Topic | Format
# ---------------------------------------
#  TOPIC1_STREAM1 | topic1      | JSON
# ---------------------------------------

ksql> CREATE TABLE topic1_table1 (userId INT, wordCount INT, message STRUCT<word VARCHAR, wordId INT>) WITH (KAFKA_TOPIC = 'topic1', VALUE_FORMAT='JSON', KEY='userId');
ksql> show tables;
#  Table Name    | Kafka Topic | Format | Windowed
# -------------------------------------------------
#  TOPIC1_TABLE1 | topic1      | JSON   | false
# -------------------------------------------------

※important There are some restrictions when creating Stream and Table. It took me a lot of trial and error to learn this rule myself. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html

from Topic from stream from stream-stream from table-table from stream-table
CREATE Stream o o o x o
CREATE Table o o x o x

Like SQL, use the JOIN syntax to create new Streams and Tables from two resources. It should be noted here that JOIN is possible only with the value set in the KEY of each resource. In other words, in the above example, you cannot JOIN with two columns in a Stream created from topic1 and a Stream created from another topic. (Example: An event with userId = 2000 and wordCount = 5 cannot be a new Stream.)

If you want to JOIN with multiple columns, you can handle it by preparing a column that combines them in the Topic message and setting it as KEY. (Example: KEY => $ {userId}-$ {wordCount})

Also, the target must be KEY in order to do GROUP BY in the query to Table.

# 3.4 Query to Stream

Queries to Stream are always queried for updated messages. In other words, messages packed into Topic before the time the query is thrown will not be output as the result of querying Stream. As mentioned at the beginning of this chapter, Streams and Tables are always running and are created in advance like Topic. When you first touch KSQL, you may not be aware of it, so you may be left with the question, "What do you use it for? When do you use it?" In an actual system, Stream processing is rarely performed via CLI, but since it is hands-on, the query result can be confirmed even for messages already in Topic as shown below with the meaning of debugging. Let's set the following values in the KSQL CLI.

ksql> SET 'auto.offset.reset'='earliest';
#Get all events in Stream
ksql> select * from topic1_stream1;
# 1576936834754 | 2019/12/21 | 3000 | 2019/12/21-23:00:34:614230 | {WORD=fuga, WORDID=4}
# 1576936837399 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hello world, WORDID=5}
# 1576936837512 | 2019/12/21 | 1000 | 2019/12/21-23:00:37:275858 | {WORD=hoge, WORDID=3}
---
#How many messages each user sent at the same time in Stream
ksql> select userId, count(messageId) from topic1_stream1 group by userId,  messageId;
# 1000 | 3
# 3000 | 2
# 3000 | 1

Aggregate functions can be defined by the developer in addition to the ones provided by default in KSQL. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#aggregate-functions

In addition, the following documents will be very helpful for aggregation. A very wide range of queries is possible, such as being able to separate events at specific times and aggregate them. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/aggregate-streaming-data.html#aggregate-streaming-data-with-ksql

In this state, try sending a message to topic1 by following the procedure in # 2.2. Stream.gif

#3.4 Stream + Stream => Stream => Table Finally, as an advanced version, let's create a new Stream from two Streams and query it to create a Table.

As an example, let's assume a scene where a user is randomly selected by lottery and the keywords that the user has spoken in the past 60 minutes are extracted. (Please forgive me because I didn't come up with a good example ;;)

First, let's copy topic1-producer.py and create topic2-producer.py.

cp topic{1,2}-producer.py

topic2-producer.py


from kafka import KafkaProducer
from datetime import datetime
import subprocess
import json
import random

cwd_name = subprocess.check_output("pwd").decode('utf-8').rstrip('\n') + "/kafka-docker"
host_ip = subprocess.check_output("ipconfig getifaddr en0", shell=True).decode('utf-8').rstrip('\n')
netstat_result = subprocess.check_output("DOCKER_HOST_IP=${host_ip} && docker-compose exec kafka netstat |awk '{ print $5 }' |grep '^1.*:32.*'", cwd=cwd_name, shell=True).decode('utf-8').rstrip('\n')
kafka_ips = list(set(netstat_result.split('\n')))
# print(kafka_ips)

date = datetime.now().strftime("%Y/%m/%d")

user_id = random.choice([1000, 2000, 3000])
producer = KafkaProducer(bootstrap_servers=kafka_ips, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
kafka_msg = {'userId': user_id}
producer.send('topic2', key=date.encode('utf-8'), value=kafka_msg).get(timeout=1)

After creating the file as above, let's create a Stream from Topic1 and Topic2 with ʻuserId` as the KEY.

ksql> CREATE STREAM topic2_stream1 (userId INTEGER) WITH (KAFKA_TOPIC = 'topic2', VALUE_FORMAT='JSON', KEY='userId');
ksql> show streams;
#  Stream Name    | Kafka Topic | Format
# ---------------------------------------
#  TOPIC2_STREAM1 | topic2      | JSON
#  TOPIC1_STREAM1 | topic1      | JSON
# ---------------------------------------

Then create a new Stream from the matching ʻuserId` from the two Streams. Since the trigger is that a new message (event) arrives at Topic2, Topic2 becomes the Stream on the LEFT side. ref) https://docs.confluent.io/current/ksql/docs/developer-guide/join-streams-and-tables.html#semantics-of-stream-stream-joins

# Topic2 Stream + Topic1 Stream => New Stream
ksql> CREATE STREAM topic1_topic2_stream1 AS SELECT t2s1.userId as userId, t1s1.messageId, t1s1.message FROM topic2_stream1 t2s1 INNER JOIN topic1_stream1 t1s1 WITHIN 1 HOURS ON t2s1.userId = t1s1.userId;

# 3.SET at 4'auto.offset.reset'='earliest';If you have done this, use the command below to restore the default so that only the changes are the query results.
ksql> SET 'auto.offset.reset'='latest';

ksql> select * from topic1_topic2_stream1;

In this state, try running topic2-producer.py from another tab. When executed, the message (event) that arrived at topic1_stream1 in the past hour will be displayed as shown below. StreamJoin.gif

Finally, let's create a Table from the query for Stream of topic1_topic2_stream1.

#Create Table from query to Stream
ksql> CREATE TABLE topic1_topic2_table1 AS SELECT userId, COLLECT_SET(message->word) as word FROM topic1_topic2_stream1 GROUP BY userId;

#If you execute the following query while sending a message to Topic2, you can see how a new message (event) is created.
ksql> select * from topic1_topic2_table1;
# 1576940945888 | 2019/12/22 | 1000 | [hello, hello world, fuga, hoge, world]
# 1576941043356 | 2019/12/22 | 3000 | [hello, hello world, fuga]

This is the end of the hands-on content.

Recommended Posts

From Kafka to KSQL --Easy environment construction with docker
From environment construction to deployment for flask + Heroku with Docker
From Python environment construction to virtual environment construction with anaconda
Easy Jupyter environment construction with Cloud9
From Ubuntu 20.04 introduction to environment construction
Data science environment construction with Docker
Build a Python + bottle + MySQL environment with Docker on RaspberryPi3! [Easy construction]
Realize environment construction for "Deep Learning from scratch" with docker and Vagrant
Collecting information from Twitter with Python (Environment construction)
From Kivy environment construction to displaying Hello World
Problems connecting to MySQL from Docker environment (Debian)
From 0 to Django development environment construction to basic operation
From easy git installation to docker startup python
Environment construction: GCP + Docker
Easy Python data analysis environment construction with Windows10 Pro x VS Code x Docker
Virtual environment construction with Docker + Flask (Python) + Jupyter notebook
[Python] OpenCV environment construction with Docker (cv2.imshow () also works)
Pepper-kun remote control environment construction with Docker + IPython Notebook
Python development environment construction 2020 [From Python installation to poetry introduction]
Environment construction with anyenv + pyenv (migrate from pyenv only (Mac))
Example of pytest environment to fix database with Docker
Procedure to exe python file from Ubunts environment construction
Easy tox environment with Jenkins
ML environment construction with Miniconda
Easy to make with syntax
Docker + Django + React environment construction
Prepare python3 environment with Docker
OpenJTalk on Windows10 (Speak Japanese with Python from environment construction)
Create an environment for "Deep Learning from scratch" with Docker
Get started with Python! ~ ① Environment construction ~
Easy Slackbot with Docker and Errbot
ruby environment construction with aws EC2
Create folders from '01' to '12' with python
Build Mysql + Python environment with docker
How to use jupyter notebook without polluting your environment with Docker
Easy to install pyspark with conda
Build PyPy execution environment with Docker
[Linux] Docker environment construction on Redhat
Automate environment construction with Shell Script
Python3 environment construction with pyenv-virtualenv (CentOS 7.3)
Postgres environment construction with Docker I struggled a little, so note
Using Chainer with CentOS7 [Environment construction]
Easy to draw graphs with matplotlib
pytorch @ python3.8 environment construction with pipenv
From ubuntu installation to running kinect with docker and ros (overview)
From PyCUDA environment construction to GPGPU programming on Mac (MacOS 10.12 Sierra)
Rebuild Django's development environment with Docker! !! !! !!
[docker] python3.5 + numpy + matplotlib environment construction
Easy deployment environment with gaffer + fabric
[Google App Engine] Flow from development environment construction to application creation
Environment construction with pyenv and pyenv-virtualenv
Deep learning tutorial from environment construction
I wanted to use jupyter notebook with docker in pip environment (opticspy)
Prepare an environment to touch grib2 format files with python (Docker edition)
Environment maintenance made with Docker (I want to post-process GrADS in Python
[Ubuntu 18.04] Python environment construction with pyenv + pipenv
Build Jupyter Lab (Python) environment with Docker
[Note] [PyTorch] From installation to easy usage
Vue.js + Flask environment construction memorandum ~ with Anaconda3 ~
How to create an NVIDIA Docker environment
[Python] Road to snake charmer (1) Environment construction