Speichern Sie die von Moskito auf Docker empfangenen JSON-Daten in DB und Elasticsearch

kibana.png

Dieses Mal werde ich es persönlich verwenden, damit ich nicht über AWS IoT spreche, das Geld kostet, sondern alles auf der Conoha Docker-Maschine bereinige, die ich mit owncloud verwende. (Ich werde die Prozedur nicht schreiben, aber ich werde nur einen bestimmten High-Port nach außen öffnen. Ich plane, über SSL mit Moskito zu kommunizieren.)

Dieses Mal werde ich die Punkte zusammenfassen, auf die ich im Folgenden hauptsächlich Bezug genommen habe.

mosquitto mosquitto-ssl paho-mqtt pymongo elasticseach BulkAPI Bulk helpers how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python elasticsearch-py

Eine kleine Erklärung

Containererstellung

Ich überprüfe es, daher verwende ich keine Docke-Compose- oder Skriptdateien, es ist meistens manuell. Es ist einfach, also richten wir den Container schnell ein.

Busybox (für MySQL)

Beliebiges Verzeichnis

Befehl.


docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh

mysql Beliebiges Verzeichnis

Dcokerfile.


FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass

Befehl.


docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql

Elasticseach Beliebiges Verzeichnis

Befehl.


docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch

Kibana Beliebiges Verzeichnis

Befehl.


docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana

mosquitto Beliebiges Verzeichnis Ich werde verschiedene Dinge an diesem Container tun, also habe ich viel hineingesteckt. Löschen Sie diejenigen, die Sie nicht benötigen.

Dockerfile.


FROM       ubuntu
MAINTAINER Ototo
ENV        container docker
RUN        apt-get -y update
RUN        apt-get install -y mosquitto
RUN        apt-get install -y openssl
RUN        apt-get install -y python
RUN        apt-get install -y python-pip
RUN        apt-get install -y mysql-client
RUN        apt-get install -y vim
RUN        pip install --upgrade pip
RUN        apt-get install -y curl
RUN        pip install elasticsearch
RUN        pip install pymongo
RUN        pip install paho-mqtt
RUN        pip install PyMySQL
EXPOSE     1883
ENTRYPOINT service mosquitto start && /bin/bash --login

Befehl.


docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883  ubuntu:mosquitto /bin/bash

Betritt den Container.

docker attach [conteinarID]

Melden Sie sich bei MySQL an.

mysql -h mosdb01 -u root -p
Enter password: rootpass

Vorerst werde ich nur den Tisch machen.

mysql>use log_db

mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
                                `date_time` datetime(1), 
                                `data` json NOT NULL,
                                PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

mysqll>exit

Bearbeiten Sie die Konfiguration.

vim /etc/mosquitto/mosquitto.conf

Fügen Sie Folgendes hinzu, um das Kennwort zu aktivieren

allow_anonymous false
password_file /etc/mosquitto/passwd

Erstellen einer Kennwortdatei und Festlegen von Benutzern und Kennwörtern

mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass

Neustart des Dienstes (kann am Ende ein Neustart des Containers sein)

service mosquitto restart

Erstellen einer Skriptdatei

vim /etc/mosquitto/sub.py

Skript Ich beschäftige mich hier mit DB und Elasticseach. Da die Ausnahmebehandlung nicht enthalten ist, können Sie sie sicher nach Ihren Wünschen schreiben.

sub.py


#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime


username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"


def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe(topic, qos=0)

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    set_mysql(msg.payload)
    set_es(msg.payload)

#insert mysql
def set_mysql(data):
        connection = pymysql.connect(host= 'mosdb01',
                                     user='root',
                                     password='rootpass',
                                     db='log_db',
                                     charset='utf8',
                                     cursorclass=pymysql.cursors.DictCursor)
        with connection.cursor() as cursor:
            sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
            r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
            print(r) # -> 1
            connection.commit()

#insert elasticseach
def set_es(data):
        es = Elasticsearch([eshost])
        actions = []
        action = {"_index": esindex,"_type": estype,"_source": data}
        actions.append(action)
        log = helpers.bulk(es, actions)

#main
if __name__ == '__main__':
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.username_pw_set(username, password=userpass)
        client.connect(host,1883)
        client.loop_forever()

Das Starten des Dienstes ist ein Fehljob, mit dem Sie ihn ausführen können, wenn Sie sich bei bash anmelden

vim ~/.bashrc

Fügen Sie Folgendes hinzu

nohup python /etc/mosquitto/sub.py &

Erstellen Sie die folgende Datei, da der Index in elasticseach registriert ist

index.json


{
  "mappings" : {
    "carlog" : {
      "properties" : {
        "battery_level" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "gps_gga" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsa" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_gsv" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "gps_rmc" : {
          "type" : "string",
          "index" : "not_analyzed"
        },
        "location" : {
          "type" : "geo_point"
        },
        "oil_press" : {
          "type" : "float",
          "index" : "not_analyzed"
        },
        "oil_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        },
        "timestamp" : {
          "type" : "date",
          "format": "YYYY-MM-dd HH:mm:ss"
        },
        "water_temp" : {
          "type" : "integer",
          "index" : "not_analyzed"
        }
      }
    }
  }
}

Indexregistrierung

Registrierungsbefehl.


curl -XPUT moselasticsearch01:9200/log --data-binary @index.json

Starten Sie den Container nach dem Registrieren des Index neu, wenn Sie den Container verlassen. Zu diesem Zeitpunkt sind die Arbeiten am Docker abgeschlossen.

Wenn Sie sich direkt bei der Bulk-API registrieren möchten, können Sie dazu die folgende Datei im Container erstellen und den Befehl ausführen.

log.json


{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0,  "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}

Befehl.


curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json

raspberry pi Wirf Daten von pi in den Container. Mit Ausnahme der Details erfolgt die Überprüfung durch Ausführen des folgenden Skripts nach Ausführung von "pip install paho-mqtt".

pub.py


#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime

#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0,  "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}

WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)

username = "mqtt"
userpass = "mqttpass"
host = "Moskito Server IP"
topic = "car/log"

#main
if __name__ == '__main__':
    client = mqtt.Client()
    client.username_pw_set(username, password=userpass)
    client.connect(host,1883, 60)
    client.publish(topic, payload=json_data, qos=1, retain=False)
    client.disconnect()

Danach greifen wir auf kibana zu und prüfen, ob die Daten kommen. discover.png

Recommended Posts

Speichern Sie die von Moskito auf Docker empfangenen JSON-Daten in DB und Elasticsearch
Daten auf jupyter verschlüsseln und speichern und bei Bedarf entschlüsseln
[Einführung in cx_Oracle] (Teil 6) Zuordnung von DB- und Python-Datentypen
So geben Sie die im Django-Modell enthaltenen Daten im JSON-Format zurück und ordnen sie der Broschüre zu
[Pfeffer] Übergibt die gesamten JSON-Daten, die durch Python-Anforderung erhalten wurden, an das Tablet.
Jetzt ist es an der Zeit, DB mit Docker zu installieren! DB-Installation für Anfänger auf Docker
Süchtig nach Zeichencode durch Einfügen und Extrahieren von Daten mit SQL Alchemy