Enregistrer les données JSON reçues par Mosquitto sur Docker vers DB et Elasticsearch

kibana.png

Cette fois, je vais l'utiliser personnellement, donc je ne parlerai pas d'AWS IoT, qui coûte de l'argent, mais je vais tout nettoyer sur la machine docker conoha que j'utilise avec owncloud. (Je n'écrirai pas la procédure, mais je n'ouvrirai qu'un port haut spécifique vers l'extérieur. Je prévois de communiquer avec mosquitto par SSL)

Cette fois, je résumerai les points auxquels j'ai principalement fait référence ci-dessous.

mosquitto mosquitto-ssl paho-mqtt pymongo elasticseach BulkAPI Bulk helpers how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python elasticsearch-py

Une petite explication

Création de conteneurs

Je le vérifie, donc je n'utilise pas de fichiers docke-compose ou script, c'est principalement manuel. C'est facile, alors configurons le conteneur rapidement.

busybox (pour mysql)

Annuaire arbitraire

commander.


docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh

mysql Annuaire arbitraire

Dcokerfile.


FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass

commander.


docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql

Elasticseach Annuaire arbitraire

commander.


docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch

Kibana Annuaire arbitraire

commander.


docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana

mosquitto Annuaire arbitraire Je vais faire diverses choses sur ce conteneur, donc j'en ai mis beaucoup. Supprimez ceux dont vous n'avez pas besoin.

Dockerfile.


FROM       ubuntu
MAINTAINER Ototo
ENV        container docker
RUN        apt-get -y update
RUN        apt-get install -y mosquitto
RUN        apt-get install -y openssl
RUN        apt-get install -y python
RUN        apt-get install -y python-pip
RUN        apt-get install -y mysql-client
RUN        apt-get install -y vim
RUN        pip install --upgrade pip
RUN        apt-get install -y curl
RUN        pip install elasticsearch
RUN        pip install pymongo
RUN        pip install paho-mqtt
RUN        pip install PyMySQL
EXPOSE     1883
ENTRYPOINT service mosquitto start && /bin/bash --login

commander.


docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883  ubuntu:mosquitto /bin/bash

Entrez dans le conteneur.

docker attach [conteinarID]

Connectez-vous à mysql.

mysql -h mosdb01 -u root -p
Enter password: rootpass

Pour le moment, je ne ferai que le tableau.

mysql>use log_db

mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
                                `date_time` datetime(1), 
                                `data` json NOT NULL,
                                PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

mysqll>exit

Modifiez le fichier config.

vim /etc/mosquitto/mosquitto.conf

Ajoutez ce qui suit pour activer le mot de passe

allow_anonymous false
password_file /etc/mosquitto/passwd

Création d'un fichier de mots de passe et définition des utilisateurs et des mots de passe

mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass

Redémarrage du service (peut être le redémarrage du conteneur à la fin)

service mosquitto restart

Créer un fichier de script

vim /etc/mosquitto/sub.py

scénario Je fouille dans db et elasticseach ici. La gestion des exceptions n'étant pas incluse, vous pouvez l'écrire comme vous le souhaitez.

sub.py


#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime


username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe(topic, qos=0)

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    set_mysql(msg.payload)
    set_es(msg.payload)

#insert mysql
def set_mysql(data):
        connection = pymysql.connect(host= 'mosdb01',
                                     user='root',
                                     password='rootpass',
                                     db='log_db',
                                     charset='utf8',
                                     cursorclass=pymysql.cursors.DictCursor)
        with connection.cursor() as cursor:
            sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
            r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
            print(r) # -> 1
            connection.commit()

#insert elasticseach
def set_es(data):
        es = Elasticsearch([eshost])
        actions = []
        action = {"_index": esindex,"_type": estype,"_source": data}
        actions.append(action)
        log = helpers.bulk(es, actions)

#main
if __name__ == '__main__':
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.username_pw_set(username, password=userpass)
        client.connect(host,1883)
        client.loop_forever()

Le lancement du service est un travail de défaite qui vous permet de le faire lorsque vous vous connectez à bash

vim ~/.bashrc

Ajoutez ce qui suit

nohup python /etc/mosquitto/sub.py &

Créez le fichier suivant car l'index est enregistré dans elasticseach

index.json


{
  "mappings" : {
    "carlog" : {
      "properties" : {
        "battery_level" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "gps_gga" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsa" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsv" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_rmc" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "location" : {
          "type" : "geo_point"
        },
        "oil_press" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "oil_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        },
        "timestamp" : {
          "type" : "date",
          "format": "YYYY-MM-dd HH:mm:ss"
        },
        "water_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        }
      }
    }
  }
}

Enregistrement d'index

Commande d'enregistrement.


curl -XPUT moselasticsearch01:9200/log --data-binary @index.json

Après avoir enregistré l'index, redémarrez le conteneur lorsque vous quittez le conteneur. À ce stade, le travail sur le docker est terminé.

Si vous souhaitez vous inscrire directement auprès de l'API Bulk, vous pouvez le faire en créant le fichier suivant sur le conteneur et en exécutant la commande.

log.json


{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0,  "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}

commander.


curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json

raspberry pi Jetez les données de pi vers le conteneur. Sauf pour les détails, la vérification se fait en exécutant le script suivant après avoir exécuté pip install paho-mqtt.

pub.py


#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime

#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0,  "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}

WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)

username = "mqtt"
userpass = "mqttpass"
host = "IP du serveur mosquitto"
topic = "car/log"

#main
if __name__ == '__main__':
    client = mqtt.Client()
    client.username_pw_set(username, password=userpass)
    client.connect(host,1883, 60)
    client.publish(topic, payload=json_data, qos=1, retain=False)
    client.disconnect()

Après cela, accédons à kibana et vérifions si les données arrivent. discover.png

Recommended Posts

Enregistrer les données JSON reçues par Mosquitto sur Docker vers DB et Elasticsearch
Crypter et enregistrer les données sur jupyter et décrypter si nécessaire
[Introduction à cx_Oracle] (Partie 6) Mappage des types de données DB et Python
Comment retourner les données contenues dans le modèle django au format json et les mapper sur le dépliant
[pepper] Transmettez toutes les données JSON obtenues par requête python à la tablette.
Il est maintenant temps d'installer DB avec Docker! Installation de la base de données pour les débutants sur Docker
Accro au code de caractère en insérant et en extrayant des données avec SQL Alchemy