Dieses Mal werde ich es persönlich verwenden, damit ich nicht über AWS IoT spreche, das Geld kostet, sondern alles auf der Conoha Docker-Maschine bereinige, die ich mit owncloud verwende. (Ich werde die Prozedur nicht schreiben, aber ich werde nur einen bestimmten High-Port nach außen öffnen. Ich plane, über SSL mit Moskito zu kommunizieren.)
mosquitto mosquitto-ssl paho-mqtt pymongo elasticseach BulkAPI Bulk helpers how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python elasticsearch-py
Ich überprüfe es, daher verwende ich keine Docke-Compose- oder Skriptdateien, es ist meistens manuell. Es ist einfach, also richten wir den Container schnell ein.
Beliebiges Verzeichnis
Befehl.
docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh
mysql Beliebiges Verzeichnis
Dcokerfile.
FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass
Befehl.
docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql
Elasticseach Beliebiges Verzeichnis
Befehl.
docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch
Kibana Beliebiges Verzeichnis
Befehl.
docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana
mosquitto Beliebiges Verzeichnis Ich werde verschiedene Dinge an diesem Container tun, also habe ich viel hineingesteckt. Löschen Sie diejenigen, die Sie nicht benötigen.
Dockerfile.
FROM ubuntu
MAINTAINER Ototo
ENV container docker
RUN apt-get -y update
RUN apt-get install -y mosquitto
RUN apt-get install -y openssl
RUN apt-get install -y python
RUN apt-get install -y python-pip
RUN apt-get install -y mysql-client
RUN apt-get install -y vim
RUN pip install --upgrade pip
RUN apt-get install -y curl
RUN pip install elasticsearch
RUN pip install pymongo
RUN pip install paho-mqtt
RUN pip install PyMySQL
EXPOSE 1883
ENTRYPOINT service mosquitto start && /bin/bash --login
Befehl.
docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883 ubuntu:mosquitto /bin/bash
Betritt den Container.
docker attach [conteinarID]
Melden Sie sich bei MySQL an.
mysql -h mosdb01 -u root -p
Enter password: rootpass
Vorerst werde ich nur den Tisch machen.
mysql>use log_db
mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
`date_time` datetime(1),
`data` json NOT NULL,
PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
mysqll>exit
Bearbeiten Sie die Konfiguration.
vim /etc/mosquitto/mosquitto.conf
Fügen Sie Folgendes hinzu, um das Kennwort zu aktivieren
allow_anonymous false
password_file /etc/mosquitto/passwd
Erstellen einer Kennwortdatei und Festlegen von Benutzern und Kennwörtern
mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass
Neustart des Dienstes (kann am Ende ein Neustart des Containers sein)
service mosquitto restart
Erstellen einer Skriptdatei
vim /etc/mosquitto/sub.py
Skript Ich beschäftige mich hier mit DB und Elasticseach. Da die Ausnahmebehandlung nicht enthalten ist, können Sie sie sicher nach Ihren Wünschen schreiben.
sub.py
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe(topic, qos=0)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
set_mysql(msg.payload)
set_es(msg.payload)
#insert mysql
def set_mysql(data):
connection = pymysql.connect(host= 'mosdb01',
user='root',
password='rootpass',
db='log_db',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
print(r) # -> 1
connection.commit()
#insert elasticseach
def set_es(data):
es = Elasticsearch([eshost])
actions = []
action = {"_index": esindex,"_type": estype,"_source": data}
actions.append(action)
log = helpers.bulk(es, actions)
#main
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password=userpass)
client.connect(host,1883)
client.loop_forever()
Das Starten des Dienstes ist ein Fehljob, mit dem Sie ihn ausführen können, wenn Sie sich bei bash anmelden
vim ~/.bashrc
Fügen Sie Folgendes hinzu
nohup python /etc/mosquitto/sub.py &
Erstellen Sie die folgende Datei, da der Index in elasticseach registriert ist
index.json
{
"mappings" : {
"carlog" : {
"properties" : {
"battery_level" : {
"type" : "float",
"index" : "not_analyzed"
},
"gps_gga" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsa" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsv" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_rmc" : {
"type" : "string",
"index" : "not_analyzed"
},
"location" : {
"type" : "geo_point"
},
"oil_press" : {
"type" : "float",
"index" : "not_analyzed"
},
"oil_temp" : {
"type" : "integer",
"index" : "not_analyzed"
},
"timestamp" : {
"type" : "date",
"format": "YYYY-MM-dd HH:mm:ss"
},
"water_temp" : {
"type" : "integer",
"index" : "not_analyzed"
}
}
}
}
}
Indexregistrierung
Registrierungsbefehl.
curl -XPUT moselasticsearch01:9200/log --data-binary @index.json
Starten Sie den Container nach dem Registrieren des Index neu, wenn Sie den Container verlassen. Zu diesem Zeitpunkt sind die Arbeiten am Docker abgeschlossen.
Wenn Sie sich direkt bei der Bulk-API registrieren möchten, können Sie dazu die folgende Datei im Container erstellen und den Befehl ausführen.
log.json
{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0, "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}
Befehl.
curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json
raspberry pi Wirf Daten von pi in den Container. Mit Ausnahme der Details erfolgt die Überprüfung durch Ausführen des folgenden Skripts nach Ausführung von "pip install paho-mqtt".
pub.py
#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime
#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0, "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}
WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)
username = "mqtt"
userpass = "mqttpass"
host = "Moskito Server IP"
topic = "car/log"
#main
if __name__ == '__main__':
client = mqtt.Client()
client.username_pw_set(username, password=userpass)
client.connect(host,1883, 60)
client.publish(topic, payload=json_data, qos=1, retain=False)
client.disconnect()
Danach greifen wir auf kibana zu und prüfen, ob die Daten kommen.
Recommended Posts