This time I will use it personally, so I will not talk about AWS IoT, which costs money, but I will clean up everything on the docker machine of conoha that I use with owncloud. (I will not write the procedure, but I will only open a specific high port to the outside. I plan to communicate with mosquitto by SSL)
mosquitto mosquitto-ssl paho-mqtt pymongo elasticseach BulkAPI Bulk helpers how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python elasticsearch-py
I'm verifying it, so I don't use docke-compose or script files, it's mostly manual. It's easy, so let's set up the container quickly.
Arbitrary directory
command.
docker pull busybox
docker run -i -t -v /var/lib/mysql --name busybox01 busybox /bin/sh
mysql Arbitrary directory
Dcokerfile.
FROM mysql
EXPOSE 3306
ENV container Docker
ENV MYSQL_ROOT_USER root
ENV MYSQL_ROOT_PASSWORD rootpass
ENV MYSQL_DATABASE log_db
ENV MYSQL_USER test
ENV MYSQL_PASSWORD testpass
command.
docker build -f ./Dockerfile -t mysql:mosql --no-cache=true .
docker run -d --name mosdb01 --volumes-from busybox01 -e MYSQL_ROOT_PASSWORD=rootpass mysql:mosql
Elasticseach Arbitrary directory
command.
docker pull elasticsearch
docker run --name moselasticsearch01 -d elasticsearch
Kibana Arbitrary directory
command.
docker pull kibana
docker run --name moskibana01 --link moselasticsearch01:elasticsearch -d -p 5601:5601 kibana
mosquitto Arbitrary directory I will do various things on this container, so I put a lot in it. Delete the ones you don't need.
Dockerfile.
FROM ubuntu
MAINTAINER Ototo
ENV container docker
RUN apt-get -y update
RUN apt-get install -y mosquitto
RUN apt-get install -y openssl
RUN apt-get install -y python
RUN apt-get install -y python-pip
RUN apt-get install -y mysql-client
RUN apt-get install -y vim
RUN pip install --upgrade pip
RUN apt-get install -y curl
RUN pip install elasticsearch
RUN pip install pymongo
RUN pip install paho-mqtt
RUN pip install PyMySQL
EXPOSE 1883
ENTRYPOINT service mosquitto start && /bin/bash --login
command.
docker build -f ./Dockerfile -t ubuntu:mosquitto --no-cache=true .
docker run --name mosquitto01 -t -i -d -v /root/mosquitto/mosquitto:/etc/mosquitto --link mosdb01 --link moselasticsearch01 -p 1883:1883 ubuntu:mosquitto /bin/bash
Enter the container.
docker attach [conteinarID]
Log in to mysql.
mysql -h mosdb01 -u root -p
Enter password: rootpass
For the time being, I will make only the table.
mysql>use log_db
mysql>CREATE TABLE `log_data` ( `id` int(11) NOT NULL AUTO_INCREMENT,
`date_time` datetime(1),
`data` json NOT NULL,
PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
mysqll>exit
Edit the config.
vim /etc/mosquitto/mosquitto.conf
Add the following to enable the password
allow_anonymous false
password_file /etc/mosquitto/passwd
Create password file and set user and password
mosquitto_passwd -c /etc/mosquitto/passwd mqtt
Password: mqttpass
Reenter password: mqttpass
Service restart (or container restart at the end)
service mosquitto restart
Creating a script file
vim /etc/mosquitto/sub.py
script I'm digging into db and elasticseach here. Since exception handling is not included, it is safe to write it as you like.
sub.py
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import pymysql.cursors
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
username = "mqtt"
userpass = "mqttpass"
host = "localhost"
topic = "car/log"
eshost = "http://moselasticsearch01:9200"
esindex = "log"
estype = "carlog"
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe(topic, qos=0)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
set_mysql(msg.payload)
set_es(msg.payload)
#insert mysql
def set_mysql(data):
connection = pymysql.connect(host= 'mosdb01',
user='root',
password='rootpass',
db='log_db',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
sql = "INSERT INTO log_data(date_time,data) VALUES ( %s, %s)"
r = cursor.execute(sql, ( datetime.now().strftime( '%Y-%m-%d %H:%M:%S' ), data))
print(r) # -> 1
connection.commit()
#insert elasticseach
def set_es(data):
es = Elasticsearch([eshost])
actions = []
action = {"_index": esindex,"_type": estype,"_source": data}
actions.append(action)
log = helpers.bulk(es, actions)
#main
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password=userpass)
client.connect(host,1883)
client.loop_forever()
Launching the service is a defeat job that lets you do it when you log in to bash
vim ~/.bashrc
Add the following
nohup python /etc/mosquitto/sub.py &
Create the following file because the index is registered in elasticseach
index.json
{
"mappings" : {
"carlog" : {
"properties" : {
"battery_level" : {
"type" : "float",
"index" : "not_analyzed"
},
"gps_gga" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsa" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_gsv" : {
"type" : "string",
"index" : "not_analyzed"
},
"gps_rmc" : {
"type" : "string",
"index" : "not_analyzed"
},
"location" : {
"type" : "geo_point"
},
"oil_press" : {
"type" : "float",
"index" : "not_analyzed"
},
"oil_temp" : {
"type" : "integer",
"index" : "not_analyzed"
},
"timestamp" : {
"type" : "date",
"format": "YYYY-MM-dd HH:mm:ss"
},
"water_temp" : {
"type" : "integer",
"index" : "not_analyzed"
}
}
}
}
}
Index registration
Registration command.
curl -XPUT moselasticsearch01:9200/log --data-binary @index.json
After registering the index, restart the container when you leave the container. At this point, the work on docker is complete.
If you want to register directly with Bulk API, you can do it by creating the following file on the container and executing the command.
log.json
{ "index" : {"_index" : "log","_type" : "carlog"}
{"timestamp":"2016-09-16 19:50:00" ,"battery_level":12.0, "location":{"lon":"144.1","lat":"43.5"}, "water_temp": 90, "oil_temp":80, "oil_press":2.0, "gps_gsa":"$GPGSA", "gps_rmc":"$GPRMC", "gps_gga":"$GPGGA", "gps_gsv":"$GPGSV"}
command.
curl -XPOST moselasticsearch01:9200/_bulk --data-binary @log.json
raspberry pi
Throw data from pi to the container.
For verification without details, execute the following script after executing pip install paho-mqtt
.
pub.py
#!/usr/bin/env python
import json
import paho.mqtt.client as mqtt
from datetime import datetime
#test_datqa
data = {"timestamp":"" ,"battery_level":0, "location":{"lon":"","lat":""}, "water_temp": 0, "oil_temp":0, "oil_press":0, "gps_gsa":"", "gps_rmc":"", "gps_gga":"", "gps_gsv":""}
WTMP = 80
OTMP = 90
OPLS = 2.0
BLVL = 12.0
GSA = "$GPGSA"
GGA = "$GPGGA"
RMC ="$GPRMC"
GSV = "$GPGSV"
data["timestamp"]=datetime.now().strftime( '%Y-%m-%d %H:%M:%S' )
data["battery_level"] = BLVL
data["water_temp"] = WTMP
data["oil_temp"] = OTMP
data["oil_press"] = OPLS
data["gps_gsa"] = GSA
data["gps_gga"] = GGA
data["gps_rmc"] = RMC
data["gps_gsv"] = GSV
data["location"]["lon"] = "139.79"
data["location"]["lat"] = "35.67"
json_data = json.dumps(data)
username = "mqtt"
userpass = "mqttpass"
host = "mosquitto server IP"
topic = "car/log"
#main
if __name__ == '__main__':
client = mqtt.Client()
client.username_pw_set(username, password=userpass)
client.connect(host,1883, 60)
client.publish(topic, payload=json_data, qos=1, retain=False)
client.disconnect()
After that, let's access kibana and check if the data is coming.
Recommended Posts