[PYTHON] Einführung in MQTT (Einführung)

■ Was ist Mücke?

Open Source-Software, die das MQTT-Protokoll implementiert. http://mosquitto.org/

Ein weiteres OSS, das MQTT implementiert, scheint Apache Apollo zu sein.

● Was ist MQTT?

Modellprotokoll Publish / Subscribe (Publish-Subscribe) https://sango.shiguredo.jp/mqtt

Eine Art asynchrones Messaging-Paradigma, mit dem der Absender (Herausgeber) einer Nachricht eine Nachricht senden kann, ohne einen bestimmten Empfänger (Abonnenten) anzunehmen. https://ja.wikipedia.org/wiki/%E5%87%BA%E7%89%88-%E8%B3%BC%E8%AA%AD%E5%9E%8B%E3%83%A2%E3%83%87%E3%83%AB

● Asynchrone Nachricht

Beim asynchronen Messaging werden Nachrichten nacheinander geworfen, ohne auf das Ergebnis zu warten, sodass der Zeitpunkt für die Verarbeitung der Nachrichten nicht übereinstimmt. http://ledsun.hatenablog.com/entry/2013/07/18/181044

■ Einführung von Mücken

● Installation von Mücken

Die folgende Seite wird hilfreich sein. http://dev.classmethod.jp/cloud/setting-up-mosquitto-logging-on-amazon-linux/

[root@localhost tmp]# wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
[root@localhost tmp]# yum install mosquitto mosquitto-clients
[root@localhost tmp]# ls -litr /etc/yum.repos.d/Mosquitto.repo

[root@localhost tmp]# less /etc/init.d/mosquitto
#! /bin/sh
~~~~~
### BEGIN INIT INFO
# Provides: mosquitto
# Required-Start: $network $remote_fs
# Required-Stop: $network $remote_fs
# Default-Start: 3 5
# Default-Stop: 0 1 2 6
# Short-Description: Mosquitto MQTT broker
# Description: Mosquitto MQTT broker
### END INIT INFO

● Automatische Starteinstellung

○ Registrieren Sie sich beim Daemon

[root@localhost tmp]# /sbin/chkconfig --add mosquitto
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto       0:off   1:off   2:off   3:on    4:off   5:on    6:off

○ Stellen Sie den automatischen Start auf der aktuellen Laufstufe ein

[root@localhost tmp]# /sbin/chkconfig mosquitto on
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto       0:off   1:off   2:on    3:on    4:on    5:on    6:off

● Starten

[root@localhost tmp]# /etc/init.d/mosquitto start
Starting mosquitto (via systemctl):                        [  OK  ]

● Überwachung durch Überwachung

Installieren Sie monit (Überwachungssoftware) und stellen Sie ein, dass es automatisch startet, wenn die Mücke ausfällt. https://easyengine.io/tutorials/monitoring/monit/

[root@localhost tmp]# cd ~
[root@localhost tmp]# wget http://mmonit.com/monit/dist/binary/5.14/monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# tar zxvf monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# cd monit-5.14/
[root@localhost tmp]# cp bin/monit /usr/bin/monit
[root@localhost tmp]# mkdir /etc/monit
[root@localhost tmp]# touch /etc/monit/monitrc
[root@localhost tmp]# chmod 0700 /etc/monit/monitrc 
[root@localhost tmp]# ln -s /etc/monit/monitrc /etc/monitrc
[root@localhost tmp]# wget https://gist.githubusercontent.com/rahul286/9975061/raw/1aa107e62ecaaa2dacfdb61a12f13efb6f15005b/monit -P /etc/init.d/
[root@localhost tmp]# chmod u+x /etc/init.d/monit
[root@localhost tmp]# echo "START=yes" > /etc/default/monit
[root@localhost tmp]# monit -t
[root@localhost tmp]# /sbin/chkconfig  --add monit
[root@localhost tmp]# /sbin/chkconfig  monit on
[root@localhost tmp]# /sbin/chkconfig --list monit
[root@localhost tmp]# view /etc/monit.d/mosquitto.conf
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"

● Protokollieren Sie die Ausgabeeinstellungen

[root@localhost tmp]# sudo mkdir /var/log/mosquitto
[root@localhost tmp]# sudo chown mosquitto /var/log/mosquitto

[root@localhost tmp]# view /etc/mosquitto/mosquitto.conf
Insgesamt 0
pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest syslog
log_dest file /var/log/mosquitto/mosquitto.log
#log_type debug
log_type error
log_type warning
log_type notice
log_type information
#log_type none
log_type subscribe
log_type unsubscribe
#log_type websockets
#log_type all

connection_messages true

log_timestamp true

include_dir /etc/mosquitto/conf.d

[root@localhost tmp]# /etc/init.d/mosquitto reload

■ Funktionsprüfung

● Test veröffentlichen und abonnieren

Starten Sie zwei Terminals und führen Sie jeweils die folgenden Befehle aus.

Veröffentlichen Sie im Thema Sensoren / Temperatur den Wert 32 mit qos1 für sich und abonnieren Sie ihn.

#Abonnieren
[root@localhost tmp]# mosquitto_sub -t sensors/temperature -q 

#Veröffentlichen
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -m 32 -q 1

#Wenn Sie eine Nachricht mit einem Zeilenvorschubcode senden möchten-Sie können eine Nachricht mit einem Zeilenvorschubcode veröffentlichen, indem Sie den Inhalt der Datei mit der Option f veröffentlichen.
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -f /var/tmp/test.txt

http://mosquitto.org/man/mosquitto_sub-1.html http://mosquitto.org/man/mosquitto_pub-1.html

■ Betreiben Sie MQTT mit Python

Folgendes wird hilfreich sein. https://librabuch.jp/2015/09/mosquiito_paho_python_mqtt/

● Einführung von pip

Installieren Sie pip, das Python-Pakete verwaltet. Es ist einfach, weil Sie Python mit pip mit einem Befehl installieren können.

[root@localhost opt]# curl -kL https://bootstrap.pypa.io/get-pip.py | python curl -kL https://bootstrap.pypa.io/get-pip.py | python
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1487k  100 1487k    0     0  1631k      0 --:--:-- --:--:-- --:--:-- 1630k
Collecting pip
  Downloading pip-8.1.1-py2.py3-none-any.whl (1.2MB)
    100% |████████████████████████████████| 1.2MB 333kB/s 
Collecting wheel
  Downloading wheel-0.29.0-py2.py3-none-any.whl (66kB)
    100% |████████████████████████████████| 71kB 1.9MB/s 
Installing collected packages: pip, wheel
Successfully installed pip-8.1.1 wheel-0.29.0

● Installation von Paho

paho ist eine Eclipse-Bibliothek, die MQTT-Funktionen bietet.

https://eclipse.org/paho/ https://pypi.python.org/pypi/paho-mqtt/1.1

[root@localhost opt]# pip install paho-mqtt
Collecting paho-mqtt
  Downloading paho-mqtt-1.1.tar.gz (41kB)
    100% |████████████████████████████████| 51kB 3.4MB/s 
Building wheels for collected packages: paho-mqtt
  Running setup.py bdist_wheel for paho-mqtt ... done
  Stored in directory: /root/.cache/pip/wheels/97/db/5f/1ddca8ee2f9b58f9bb68208323bd39bb0b177f32f434aa4b95
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.1

[root@localhost opt]# ls -litr /usr/lib/python2.7/site-packages/paho/mqtt
Insgesamt 196
135479923 -rw-r--r--.1 Wurzel Wurzel 8713 19. März 23:15 publish.py
135479924 -rw-r--r--.1 Wurzel Wurzel 91388 19. März 23:15 client.py
135479925 -rw-r--r--.1 Wurzel Wurzel 20. März 19 23:15 __init__.py
135479926 -rw-r--r--.1 Wurzel Wurzel 170 19. März 23:15 __init__.pyc
135479927 -rw-r--r--.1 Wurzel Wurzel 71288 19. März 23:15 client.pyc
135479928 -rw-r--r--.1 Wurzel Wurzel 8332 19. März 23:15 publish.pyc

● Funktionsprüfung

Bereiten Sie auch Verlage und Abonnenten vor.


from time import sleep
import paho.mqtt.client as mqtt

HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
MESSAGE = 'test message'

PUBLISH_NUMBER = 5
SLEEP_TIME = 5

def publish_many_times(client, topic='topic/default', message='default', number=1, time=1, print_flag=False):

    for i in range(number):
        client.publish(topic, message)
        if print_flag == True:
            print (topic + ' ' + message)
        sleep(time)

    client.disconnect()

if __name__ == '__main__':
    client = mqtt.Client(protocol=mqtt.MQTTv311)

    print "publish start " + str(type(client))

    client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

    publish_many_times(client,TOPIC, MESSAGE, PUBLISH_NUMBER, SLEEP_TIME)                                      
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'

"""
Führen Sie diese aus, wenn Sie versuchen, eine Verbindung herzustellen
def on_connect(client, userdata, flags, respons_code):

* client
Clientklasseninstanz

* userdata
Beim Erstellen einer Instanz einer neuen Client-Klasse mit einem beliebigen Datentyp>Kann eingestellt werden

* flags
Wörterbuch mit Antwortflags
Gültig für Benutzer, deren saubere Sitzung auf 0 gesetzt ist.
Stellen Sie fest, ob die Sitzung noch vorhanden ist.
Wenn die Bereinigungssitzung 0 ist, stellen Sie die Verbindung zum zuvor verbundenen Benutzer wieder her.

0 :Sitzung existiert nicht
1 :Sitzung existiert

* respons_code
Der Antwortcode gibt an, ob die Verbindung erfolgreich war.
0:Erfolgreiche Verbindung
1:Verbindung fehlgeschlagen- incorrect protocol version
2:Verbindung fehlgeschlagen- invalid client identifier
3:Verbindung fehlgeschlagen- server unavailable
4:Verbindung fehlgeschlagen- bad username or password
5:Verbindung fehlgeschlagen- not authorised
"""
def on_connect(client, userdata, flags, respons_code):
    print('status {0}'.format(respons_code))
    client.subscribe(client.topic)

"""
def on_message(client, userdata, message):
Ausführen, wenn das Thema empfangen wird
"""
def on_message(client, userdata, message):
    print(message.topic + ' ' + str(message.payload))

if __name__ == '__main__':

    client = mqtt.Client(protocol=mqtt.MQTTv311)
    client.topic = TOPIC

    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

    #Schleife
    client.loop_forever()                                  
[root@localhost tmp]# python publisher.py 
[root@localhost tmp]# python subscriber.py 
status 0
test_topic/test1 test message
test_topic/test1 test message
test_topic/test1 test message

client.on_connect () und client.on_message () sind Rückruffunktionen In der Schleife in client.loop_forever () wird sie vom Handler aufgerufen und ausgeführt.

Rückrufe finden Sie unter hier.

def _handle_connack(self):
    if self._strict_protocol:
        if self._in_packet['remaining_length'] != 2:
            return MQTT_ERR_PROTOCOL

    if len(self._in_packet['packet']) != 2:
        return MQTT_ERR_PROTOCOL

    (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
    if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
        self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
        # Downgrade to MQTT v3.1
        self._protocol = MQTTv31
        return self.reconnect()

    if result == 0:
        self._state = mqtt_cs_connected

    self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
    self._callback_mutex.acquire()
    if self.on_connect:
        self._in_callback = True

        if sys.version_info[0] < 3:
            argcount = self.on_connect.func_code.co_argcount
        else:
            argcount = self.on_connect.__code__.co_argcount

        if argcount == 3:
            self.on_connect(self, self._userdata, result)
        else:
            flags_dict = dict()
            flags_dict['session present'] = flags & 0x01
            self.on_connect(self, self._userdata, flags_dict, result)
        self._in_callback = False
    self._callback_mutex.release()
    if result == 0:
        rc = 0
        self._out_message_mutex.acquire()
        for m in self._out_messages:
            m.timestamp = time.time()
            if m.state == mqtt_ms_queued:
                self.loop_write() # Process outgoing messages that have just been queued up
                self._out_message_mutex.release()
                return MQTT_ERR_SUCCESS

            if m.qos == 0:
                self._in_callback = True # Don't call loop_write after _send_publish()
                rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                self._in_callback = False
                if rc != 0:
                    self._out_message_mutex.release()
                    return rc
            elif m.qos == 1:
                if m.state == mqtt_ms_publish:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_puback
                    self._in_callback = True # Don't call loop_write after _send_publish()
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
            elif m.qos == 2:
                if m.state == mqtt_ms_publish:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_pubrec
                    self._in_callback = True # Don't call loop_write after _send_publish()
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
                elif m.state == mqtt_ms_resend_pubrel:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_pubcomp
                    self._in_callback = True # Don't call loop_write after _send_pubrel()
                    rc = self._send_pubrel(m.mid, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
            self.loop_write() # Process outgoing messages that have just been queued up
        self._out_message_mutex.release()
        return rc
    elif result > 0 and result < 6:
        return MQTT_ERR_CONN_REFUSED
    else:
        return MQTT_ERR_PROTOCOL
def _handle_on_message(self, message):
    self._callback_mutex.acquire()
    matched = False
    for t in self.on_message_filtered:
        if topic_matches_sub(t[0], message.topic):
            self._in_callback = True
            t[1](self,self._userdata,message)
            self._in_callback = False
            matched = True

    if matched == False and self.on_message:
        self._in_callback = True
        self.on_message(self,self._userdata,message)
        self._in_callback = False

    self._callback_mutex.release()

■ Referenzseite

Recommended Posts

Einführung in MQTT (Einführung)
Einführung in Scrapy (1)
Einführung in Scrapy (3)
Erste Schritte mit Supervisor
Einführung in Tkinter 1: Einführung
Einführung in PyQt
Einführung in Scrapy (2)
[Linux] Einführung in Linux
Einführung in Scrapy (4)
Einführung in discord.py (2)
Einführung in Lightning Pytorch
Erste Schritte mit Web Scraping
Einführung in nichtparametrische Felder
Einführung in EV3 / MicroPython
Einführung in die Python-Sprache
Einführung in die TensorFlow-Bilderkennung
Einführung in OpenCV (Python) - (2)
Einführung in PyQt4 Teil 1
Einführung in die Abhängigkeitsinjektion
Einführung in Private Chainer
Einführung in das maschinelle Lernen
AOJ Einführung in die Programmierung Thema Nr. 1, Thema Nr. 2, Thema Nr. 3, Thema Nr. 4
Einführung in das elektronische Papiermodul
Einführung in den Wörterbuch-Suchalgorithmus
Einführung in die Monte-Carlo-Methode
[Lernmemorandum] Einführung in vim
Einführung in PyTorch (1) Automatische Differenzierung
opencv-python Einführung in die Bildverarbeitung
Einführung in Python Django (2) Win
Einführung in das Schreiben von Cython [Notizen]
Einführung in Private TensorFlow
Eine Einführung in das maschinelle Lernen
[Einführung in cx_Oracle] Übersicht über cx_Oracle
Eine super Einführung in Linux
Einführung
AOJ Einführung in die Programmierung Thema Nr. 7, Thema Nr. 8
Einführung in die Anomalieerkennung 1 Grundlagen
Einführung in RDB mit sqlalchemy Ⅰ
[Einführung in Systre] Fibonacci Retracement ♬
Einführung in die nichtlineare Optimierung (I)
Einführung in die serielle Kommunikation [Python]
AOJ Einführung in die Programmierung Thema Nr. 5, Thema Nr. 6
Einführung in Deep Learning ~ Lernregeln ~
[Einführung in Python] <Liste> [Bearbeiten: 22.02.2020]
Einführung in Python (Python-Version APG4b)
Eine Einführung in die Python-Programmierung
[Einführung in cx_Oracle] (8.) Version cx_Oracle 8.0
Einführung in discord.py (3) Verwenden von Stimme
Einführung in die Bayes'sche Optimierung
Tiefe Stärkung des Lernens 1 Einführung in die Stärkung des Lernens
Super Einführung in das maschinelle Lernen
Einführung in Ansible Teil In'Inventory '
Serie: Einführung in den Inhalt von cx_Oracle
[Einführung] Verwendung von open3d
Einführung in Python For, While
Einführung in Deep Learning ~ Backpropagation ~
Einführung in Ansible Teil ④'Variable '