Save json data received by mosquitto on docker to db and Elasticsearch

kibana.png

This time I will use it personally, so I will not talk about AWS IoT, which costs money, but I will clean up everything on the docker machine of conoha that I use with owncloud. (I will not write the procedure, but I will only open a specific high port to the outside. I plan to communicate with mosquitto by SSL)

This time, I will summarize the points that I mainly referred to below.

mosquitto mosquitto-ssl paho-mqtt pymongo elasticseach BulkAPI Bulk helpers how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python elasticsearch-py

A little explanation

Container creation

I'm verifying it, so I don't use docke-compose or script files, it's mostly manual. It's easy, so let's set up the container quickly.

busybox (for mysql)

Arbitrary directory

command.


docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh

mysql Arbitrary directory

Dcokerfile.


FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass

command.


docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql

Elasticseach Arbitrary directory

command.


docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch

Kibana Arbitrary directory

command.


docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana

mosquitto Arbitrary directory I will do various things on this container, so I put a lot in it. Delete the ones you don't need.

Dockerfile.


FROM       ubuntu
MAINTAINER Ototo
ENV        container docker
RUN        apt-get -y update
RUN        apt-get install -y mosquitto
RUN        apt-get install -y openssl
RUN        apt-get install -y python
RUN        apt-get install -y python-pip
RUN        apt-get install -y mysql-client
RUN        apt-get install -y vim
RUN        pip install --upgrade pip
RUN        apt-get install -y curl
RUN        pip install elasticsearch
RUN        pip install pymongo
RUN        pip install paho-mqtt
RUN        pip install PyMySQL
EXPOSE     1883
ENTRYPOINT service mosquitto start && /bin/bash --login

command.


docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883  ubuntu:mosquitto /bin/bash

Enter the container.

docker attach [conteinarID]

Log in to mysql.

mysql -h mosdb01 -u root -p
Enter password: rootpass

For the time being, I will make only the table.

mysql>use log_db

mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
                                `date_time` datetime(1), 
                                `data` json NOT NULL,
                                PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

mysqll>exit

Edit the config.

vim /etc/mosquitto/mosquitto.conf

Add the following to enable the password

allow_anonymous false
password_file /etc/mosquitto/passwd

Create password file and set user and password

mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass

Service restart (or container restart at the end)

service mosquitto restart

Creating a script file

vim /etc/mosquitto/sub.py

script I'm digging into db and elasticseach here. Since exception handling is not included, it is safe to write it as you like.

sub.py


#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime


username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe(topic, qos=0)

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    set_mysql(msg.payload)
    set_es(msg.payload)

#insert mysql
def set_mysql(data):
        connection = pymysql.connect(host= 'mosdb01',
                                     user='root',
                                     password='rootpass',
                                     db='log_db',
                                     charset='utf8',
                                     cursorclass=pymysql.cursors.DictCursor)
        with connection.cursor() as cursor:
            sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
            r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
            print(r) # -> 1
            connection.commit()

#insert elasticseach
def set_es(data):
        es = Elasticsearch([eshost])
        actions = []
        action = {"_index": esindex,"_type": estype,"_source": data}
        actions.append(action)
        log = helpers.bulk(es, actions)

#main
if __name__ == '__main__':
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.username_pw_set(username, password=userpass)
        client.connect(host,1883)
        client.loop_forever()

Launching the service is a defeat job that lets you do it when you log in to bash

vim ~/.bashrc

Add the following

nohup python /etc/mosquitto/sub.py &

Create the following file because the index is registered in elasticseach

index.json


{
  "mappings" : {
    "carlog" : {
      "properties" : {
        "battery_level" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "gps_gga" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsa" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsv" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_rmc" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "location" : {
          "type" : "geo_point"
        },
        "oil_press" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "oil_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        },
        "timestamp" : {
          "type" : "date",
          "format": "YYYY-MM-dd HH:mm:ss"
        },
        "water_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        }
      }
    }
  }
}

Index registration

Registration command.


curl -XPUT moselasticsearch01:9200/log --data-binary @index.json

After registering the index, restart the container when you leave the container. At this point, the work on docker is complete.

If you want to register directly with Bulk API, you can do it by creating the following file on the container and executing the command.

log.json


{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0,  "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}

command.


curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json

raspberry pi Throw data from pi to the container. For verification without details, execute the following script after executing pip install paho-mqtt.

pub.py


#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime

#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0,  "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}

WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)

username = "mqtt"
userpass = "mqttpass"
host = "mosquitto server IP"
topic = "car/log"

#main
if __name__ == '__main__':
    client = mqtt.Client()
    client.username_pw_set(username, password=userpass)
    client.connect(host,1883, 60)
    client.publish(topic, payload=json_data, qos=1, retain=False)
    client.disconnect()

After that, let's access kibana and check if the data is coming. discover.png

Recommended Posts

Save json data received by mosquitto on docker to db and Elasticsearch
Encrypt and save data on jupyter and decrypt if necessary
[Introduction to cx_Oracle] (Part 6) DB and Python data type mapping
How to return the data contained in django model in json format and map it on leaflet
[pepper] Pass the JSON data obtained by python request to the tablet.
It's time to install DB with Docker! DB installation for beginners on Docker
Addicted to character code by inserting and extracting data with SQLAlchemy