Cette fois, je vais l'utiliser personnellement, donc je ne parlerai pas d'AWS IoT, qui coûte de l'argent, mais je vais tout nettoyer sur la machine docker conoha que j'utilise avec owncloud. (Je n'écrirai pas la procédure, mais je n'ouvrirai qu'un port haut spécifique vers l'extérieur. Je prévois de communiquer avec mosquitto par SSL)
mosquitto mosquitto-ssl paho-mqtt pymongo elasticseach BulkAPI Bulk helpers how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python elasticsearch-py
Je le vérifie, donc je n'utilise pas de fichiers docke-compose ou script, c'est principalement manuel. C'est facile, alors configurons le conteneur rapidement.
Annuaire arbitraire
commander.
docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh
mysql Annuaire arbitraire
Dcokerfile.
FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass
commander.
docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql
Elasticseach Annuaire arbitraire
commander.
docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch
Kibana Annuaire arbitraire
commander.
docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana
mosquitto Annuaire arbitraire Je vais faire diverses choses sur ce conteneur, donc j'en ai mis beaucoup. Supprimez ceux dont vous n'avez pas besoin.
Dockerfile.
FROM ubuntu
MAINTAINER Ototo
ENV container docker
RUN apt-get -y update
RUN apt-get install -y mosquitto
RUN apt-get install -y openssl
RUN apt-get install -y python
RUN apt-get install -y python-pip
RUN apt-get install -y mysql-client
RUN apt-get install -y vim
RUN pip install --upgrade pip
RUN apt-get install -y curl
RUN pip install elasticsearch
RUN pip install pymongo
RUN pip install paho-mqtt
RUN pip install PyMySQL
EXPOSE 1883
ENTRYPOINT service mosquitto start && /bin/bash --login
commander.
docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883 ubuntu:mosquitto /bin/bash
Entrez dans le conteneur.
docker attach [conteinarID]
Connectez-vous à mysql.
mysql -h mosdb01 -u root -p
Enter password: rootpass
Pour le moment, je ne ferai que le tableau.
mysql>use log_db
mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
`date_time` datetime(1),
`data` json NOT NULL,
PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
mysqll>exit
Modifiez le fichier config.
vim /etc/mosquitto/mosquitto.conf
Ajoutez ce qui suit pour activer le mot de passe
allow_anonymous false
password_file /etc/mosquitto/passwd
Création d'un fichier de mots de passe et définition des utilisateurs et des mots de passe
mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass
Redémarrage du service (peut être le redémarrage du conteneur à la fin)
service mosquitto restart
Créer un fichier de script
vim /etc/mosquitto/sub.py
scénario Je fouille dans db et elasticseach ici. La gestion des exceptions n'étant pas incluse, vous pouvez l'écrire comme vous le souhaitez.
sub.py
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe(topic, qos=0)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
set_mysql(msg.payload)
set_es(msg.payload)
#insert mysql
def set_mysql(data):
connection = pymysql.connect(host= 'mosdb01',
user='root',
password='rootpass',
db='log_db',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
print(r) # -> 1
connection.commit()
#insert elasticseach
def set_es(data):
es = Elasticsearch([eshost])
actions = []
action = {"_index": esindex,"_type": estype,"_source": data}
actions.append(action)
log = helpers.bulk(es, actions)
#main
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password=userpass)
client.connect(host,1883)
client.loop_forever()
Le lancement du service est un travail de défaite qui vous permet de le faire lorsque vous vous connectez à bash
vim ~/.bashrc
Ajoutez ce qui suit
nohup python /etc/mosquitto/sub.py &
Créez le fichier suivant car l'index est enregistré dans elasticseach
index.json
{
"mappings" : {
"carlog" : {
"properties" : {
"battery_level" : {
"type" : "float",
"index" : "not_analyzed"
},
"gps_gga" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsa" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsv" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_rmc" : {
"type" : "string",
"index" : "not_analyzed"
},
"location" : {
"type" : "geo_point"
},
"oil_press" : {
"type" : "float",
"index" : "not_analyzed"
},
"oil_temp" : {
"type" : "integer",
"index" : "not_analyzed"
},
"timestamp" : {
"type" : "date",
"format": "YYYY-MM-dd HH:mm:ss"
},
"water_temp" : {
"type" : "integer",
"index" : "not_analyzed"
}
}
}
}
}
Enregistrement d'index
Commande d'enregistrement.
curl -XPUT moselasticsearch01:9200/log --data-binary @index.json
Après avoir enregistré l'index, redémarrez le conteneur lorsque vous quittez le conteneur. À ce stade, le travail sur le docker est terminé.
Si vous souhaitez vous inscrire directement auprès de l'API Bulk, vous pouvez le faire en créant le fichier suivant sur le conteneur et en exécutant la commande.
log.json
{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0, "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}
commander.
curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json
raspberry pi
Jetez les données de pi vers le conteneur.
Sauf pour les détails, la vérification se fait en exécutant le script suivant après avoir exécuté pip install paho-mqtt
.
pub.py
#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime
#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0, "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}
WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)
username = "mqtt"
userpass = "mqttpass"
host = "IP du serveur mosquitto"
topic = "car/log"
#main
if __name__ == '__main__':
client = mqtt.Client()
client.username_pw_set(username, password=userpass)
client.connect(host,1883, 60)
client.publish(topic, payload=json_data, qos=1, retain=False)
client.disconnect()
Après cela, accédons à kibana et vérifions si les données arrivent.
Recommended Posts