[PYTHON] Introduction à MQTT (Introduction)

■ Qu'est-ce que le moustique?

Logiciel open source qui implémente le protocole MQTT. http://mosquitto.org/

Un autre OSS qui implémente MQTT semble être Apache Apollo.

● Qu'est-ce que MQTT?

Protocole de modèle de publication / abonnement (publication-abonnement) https://sango.shiguredo.jp/mqtt

Un type de paradigme de messagerie asynchrone qui permet à un expéditeur de message (éditeur) d'envoyer un message sans supposer un destinataire spécifique (abonné). https://ja.wikipedia.org/wiki/%E5%87%BA%E7%89%88-%E8%B3%BC%E8%AA%AD%E5%9E%8B%E3%83%A2%E3%83%87%E3%83%AB

● Message asynchrone

Dans la messagerie asynchrone, les messages sont lancés les uns après les autres sans attendre le résultat, de sorte que le moment du traitement des messages ne correspond pas. http://ledsun.hatenablog.com/entry/2013/07/18/181044

■ Introduction du moustique

● Installation de mosquitto

Le site suivant sera utile. http://dev.classmethod.jp/cloud/setting-up-mosquitto-logging-on-amazon-linux/

[root@localhost tmp]# wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
[root@localhost tmp]# yum install mosquitto mosquitto-clients
[root@localhost tmp]# ls -litr /etc/yum.repos.d/Mosquitto.repo

[root@localhost tmp]# less /etc/init.d/mosquitto
#! /bin/sh
~~~~~
### BEGIN INIT INFO
# Provides: mosquitto
# Required-Start: $network $remote_fs
# Required-Stop: $network $remote_fs
# Default-Start: 3 5
# Default-Stop: 0 1 2 6
# Short-Description: Mosquitto MQTT broker
# Description: Mosquitto MQTT broker
### END INIT INFO

● Paramètre de démarrage automatique

○ Inscrivez-vous avec le démon

[root@localhost tmp]# /sbin/chkconfig --add mosquitto
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto       0:off   1:off   2:off   3:on    4:off   5:on    6:off

○ Configuré pour démarrer automatiquement au niveau d'exécution actuel

[root@localhost tmp]# /sbin/chkconfig mosquitto on
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto       0:off   1:off   2:on    3:on    4:on    5:on    6:off

● Démarrer

[root@localhost tmp]# /etc/init.d/mosquitto start
Starting mosquitto (via systemctl):                        [  OK  ]

● Surveillance par monit

Installez monit (logiciel de surveillance) et configurez-le pour qu'il démarre automatiquement lorsque le moustique tombe en panne. https://easyengine.io/tutorials/monitoring/monit/

[root@localhost tmp]# cd ~
[root@localhost tmp]# wget http://mmonit.com/monit/dist/binary/5.14/monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# tar zxvf monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# cd monit-5.14/
[root@localhost tmp]# cp bin/monit /usr/bin/monit
[root@localhost tmp]# mkdir /etc/monit
[root@localhost tmp]# touch /etc/monit/monitrc
[root@localhost tmp]# chmod 0700 /etc/monit/monitrc 
[root@localhost tmp]# ln -s /etc/monit/monitrc /etc/monitrc
[root@localhost tmp]# wget https://gist.githubusercontent.com/rahul286/9975061/raw/1aa107e62ecaaa2dacfdb61a12f13efb6f15005b/monit -P /etc/init.d/
[root@localhost tmp]# chmod u+x /etc/init.d/monit
[root@localhost tmp]# echo "START=yes" > /etc/default/monit
[root@localhost tmp]# monit -t
[root@localhost tmp]# /sbin/chkconfig  --add monit
[root@localhost tmp]# /sbin/chkconfig  monit on
[root@localhost tmp]# /sbin/chkconfig --list monit
[root@localhost tmp]# view /etc/monit.d/mosquitto.conf
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"

● Paramètres de sortie du journal

[root@localhost tmp]# sudo mkdir /var/log/mosquitto
[root@localhost tmp]# sudo chown mosquitto /var/log/mosquitto

[root@localhost tmp]# view /etc/mosquitto/mosquitto.conf
Total 0
pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest syslog
log_dest file /var/log/mosquitto/mosquitto.log
#log_type debug
log_type error
log_type warning
log_type notice
log_type information
#log_type none
log_type subscribe
log_type unsubscribe
#log_type websockets
#log_type all

connection_messages true

log_timestamp true

include_dir /etc/mosquitto/conf.d

[root@localhost tmp]# /etc/init.d/mosquitto reload

■ Contrôle de fonctionnement

● Publier et souscrire un test

Lancez deux terminaux et exécutez les commandes suivantes pour chacun.

À partir du sujet capteurs / température, publiez la valeur 32 pour vous-même avec qos1 et abonnez-vous.

#Souscrire
[root@localhost tmp]# mosquitto_sub -t sensors/temperature -q 

#Publier
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -m 32 -q 1

#Si vous souhaitez envoyer un message contenant un code de saut de ligne-Vous pouvez publier un message contenant un code de saut de ligne en publiant le contenu du fichier à l'aide de l'option f.
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -f /var/tmp/test.txt

http://mosquitto.org/man/mosquitto_sub-1.html http://mosquitto.org/man/mosquitto_pub-1.html

■ Manipuler MQTT à l'aide de Python

Ce qui suit sera utile. https://librabuch.jp/2015/09/mosquiito_paho_python_mqtt/

● Introduction de pip

Installez pip qui gère les packages python. C'est facile car vous pouvez installer python en utilisant pip avec une seule commande.

[root@localhost opt]# curl -kL https://bootstrap.pypa.io/get-pip.py | python curl -kL https://bootstrap.pypa.io/get-pip.py | python
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1487k  100 1487k    0     0  1631k      0 --:--:-- --:--:-- --:--:-- 1630k
Collecting pip
  Downloading pip-8.1.1-py2.py3-none-any.whl (1.2MB)
    100% |████████████████████████████████| 1.2MB 333kB/s 
Collecting wheel
  Downloading wheel-0.29.0-py2.py3-none-any.whl (66kB)
    100% |████████████████████████████████| 71kB 1.9MB/s 
Installing collected packages: pip, wheel
Successfully installed pip-8.1.1 wheel-0.29.0

● Installation de paho

paho est une bibliothèque Eclipse qui fournit la fonctionnalité MQTT.

https://eclipse.org/paho/ https://pypi.python.org/pypi/paho-mqtt/1.1

[root@localhost opt]# pip install paho-mqtt
Collecting paho-mqtt
  Downloading paho-mqtt-1.1.tar.gz (41kB)
    100% |████████████████████████████████| 51kB 3.4MB/s 
Building wheels for collected packages: paho-mqtt
  Running setup.py bdist_wheel for paho-mqtt ... done
  Stored in directory: /root/.cache/pip/wheels/97/db/5f/1ddca8ee2f9b58f9bb68208323bd39bb0b177f32f434aa4b95
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.1

[root@localhost opt]# ls -litr /usr/lib/python2.7/site-packages/paho/mqtt
Total 196
135479923 -rw-r--r--.1 racine racine 8713 19 mars 23:15 publish.py
135479924 -rw-r--r--.1 racine racine 91388 19 mars 23:15 client.py
135479925 -rw-r--r--.1 racine racine 20 mars 19 23:15 __init__.py
135479926 -rw-r--r--.1 racine racine 170 19 mars 23:15 __init__.pyc
135479927 -rw-r--r--.1 racine racine 71288 19 mars 23:15 client.pyc
135479928 -rw-r--r--.1 racine racine 8332 19 mars 23:15 publish.pyc

● Contrôle de fonctionnement

Préparez également les éditeurs et les abonnés.


from time import sleep
import paho.mqtt.client as mqtt

HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
MESSAGE = 'test message'

PUBLISH_NUMBER = 5
SLEEP_TIME = 5

def publish_many_times(client, topic='topic/default', message='default', number=1, time=1, print_flag=False):

    for i in range(number):
        client.publish(topic, message)
        if print_flag == True:
            print (topic + ' ' + message)
        sleep(time)

    client.disconnect()

if __name__ == '__main__':
    client = mqtt.Client(protocol=mqtt.MQTTv311)

    print "publish start " + str(type(client))

    client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

    publish_many_times(client,TOPIC, MESSAGE, PUBLISH_NUMBER, SLEEP_TIME)                                      
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'

"""
Exécuter en essayant de se connecter
def on_connect(client, userdata, flags, respons_code):

* client
Instance de classe client

* userdata
Lors de la création d'une instance d'une nouvelle classe Client avec n'importe quel type de données>Peut être mis en place

* flags
Dictionnaire avec indicateurs de réponse
Valable pour les utilisateurs dont la session propre est définie sur 0.
Déterminez si la session existe toujours.
Si la session de nettoyage est 0, reconnectez-vous à l'utilisateur précédemment connecté.

0 :La session n'existe pas
1 :La session existe

* respons_code
Le code de réponse indique si la connexion a réussi.
0:Connexion réussie
1:La connexion a échoué- incorrect protocol version
2:La connexion a échoué- invalid client identifier
3:La connexion a échoué- server unavailable
4:La connexion a échoué- bad username or password
5:La connexion a échoué- not authorised
"""
def on_connect(client, userdata, flags, respons_code):
    print('status {0}'.format(respons_code))
    client.subscribe(client.topic)

"""
def on_message(client, userdata, message):
Exécuter lorsque le sujet est reçu
"""
def on_message(client, userdata, message):
    print(message.topic + ' ' + str(message.payload))

if __name__ == '__main__':

    client = mqtt.Client(protocol=mqtt.MQTTv311)
    client.topic = TOPIC

    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

    #boucle
    client.loop_forever()                                  
[root@localhost tmp]# python publisher.py 
[root@localhost tmp]# python subscriber.py 
status 0
test_topic/test1 test message
test_topic/test1 test message
test_topic/test1 test message

client.on_connect () et client.on_message () sont des fonctions de rappel Dans la boucle à l'intérieur de client.loop_forever (), il est appelé et exécuté par le gestionnaire.

Voir ici pour les rappels.

def _handle_connack(self):
    if self._strict_protocol:
        if self._in_packet['remaining_length'] != 2:
            return MQTT_ERR_PROTOCOL

    if len(self._in_packet['packet']) != 2:
        return MQTT_ERR_PROTOCOL

    (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
    if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
        self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
        # Downgrade to MQTT v3.1
        self._protocol = MQTTv31
        return self.reconnect()

    if result == 0:
        self._state = mqtt_cs_connected

    self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
    self._callback_mutex.acquire()
    if self.on_connect:
        self._in_callback = True

        if sys.version_info[0] < 3:
            argcount = self.on_connect.func_code.co_argcount
        else:
            argcount = self.on_connect.__code__.co_argcount

        if argcount == 3:
            self.on_connect(self, self._userdata, result)
        else:
            flags_dict = dict()
            flags_dict['session present'] = flags & 0x01
            self.on_connect(self, self._userdata, flags_dict, result)
        self._in_callback = False
    self._callback_mutex.release()
    if result == 0:
        rc = 0
        self._out_message_mutex.acquire()
        for m in self._out_messages:
            m.timestamp = time.time()
            if m.state == mqtt_ms_queued:
                self.loop_write() # Process outgoing messages that have just been queued up
                self._out_message_mutex.release()
                return MQTT_ERR_SUCCESS

            if m.qos == 0:
                self._in_callback = True # Don't call loop_write after _send_publish()
                rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                self._in_callback = False
                if rc != 0:
                    self._out_message_mutex.release()
                    return rc
            elif m.qos == 1:
                if m.state == mqtt_ms_publish:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_puback
                    self._in_callback = True # Don't call loop_write after _send_publish()
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
            elif m.qos == 2:
                if m.state == mqtt_ms_publish:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_pubrec
                    self._in_callback = True # Don't call loop_write after _send_publish()
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
                elif m.state == mqtt_ms_resend_pubrel:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_pubcomp
                    self._in_callback = True # Don't call loop_write after _send_pubrel()
                    rc = self._send_pubrel(m.mid, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
            self.loop_write() # Process outgoing messages that have just been queued up
        self._out_message_mutex.release()
        return rc
    elif result > 0 and result < 6:
        return MQTT_ERR_CONN_REFUSED
    else:
        return MQTT_ERR_PROTOCOL
def _handle_on_message(self, message):
    self._callback_mutex.acquire()
    matched = False
    for t in self.on_message_filtered:
        if topic_matches_sub(t[0], message.topic):
            self._in_callback = True
            t[1](self,self._userdata,message)
            self._in_callback = False
            matched = True

    if matched == False and self.on_message:
        self._in_callback = True
        self.on_message(self,self._userdata,message)
        self._in_callback = False

    self._callback_mutex.release()

■ Site de référence

Recommended Posts

Introduction à MQTT (Introduction)
Introduction à Scrapy (1)
Introduction à Scrapy (3)
Premiers pas avec Supervisor
Introduction à Tkinter 1: Introduction
Introduction à PyQt
Introduction à Scrapy (2)
[Linux] Introduction à Linux
Introduction à Scrapy (4)
Introduction à discord.py (2)
Introduction à Lightning Pytorch
Premiers pas avec le Web Scraping
Introduction aux baies non paramétriques
Introduction à EV3 / MicroPython
Introduction au langage Python
Introduction à la reconnaissance d'image TensorFlow
Introduction à OpenCV (python) - (2)
Introduction à PyQt4 Partie 1
Introduction à l'injection de dépendances
Introduction à Private Chainer
Introduction à l'apprentissage automatique
AOJ Introduction à la programmation Sujet 1, Sujet 2, Sujet 3, Sujet 4
Introduction au module de papier électronique
Introduction à l'algorithme de recherche de dictionnaire
Introduction à la méthode Monte Carlo
[Mémorandum d'apprentissage] Introduction à vim
Introduction à PyTorch (1) Différenciation automatique
opencv-python Introduction au traitement d'image
Introduction à Python Django (2) Win
Introduction à l'écriture de Cython [Notes]
Introduction à Private TensorFlow
Une introduction à l'apprentissage automatique
[Introduction à cx_Oracle] Présentation de cx_Oracle
Une super introduction à Linux
introduction
AOJ Introduction à la programmation Sujet n ° 7, Sujet n ° 8
Introduction à la détection des anomalies 1 principes de base
Introduction à RDB avec sqlalchemy Ⅰ
[Introduction au système] Retracement de Fibonacci ♬
Introduction à l'optimisation non linéaire (I)
Introduction à la communication série [Python]
AOJ Introduction à la programmation Sujet n ° 5, Sujet n ° 6
Introduction au Deep Learning ~ Règles d'apprentissage ~
[Introduction à Python] <liste> [modifier le 22/02/2020]
Introduction à Python (version Python APG4b)
Une introduction à la programmation Python
[Introduction à cx_Oracle] (8e) version de cx_Oracle 8.0
Introduction à discord.py (3) Utilisation de la voix
Introduction à l'optimisation bayésienne
Apprentissage par renforcement profond 1 Introduction au renforcement de l'apprentissage
Super introduction à l'apprentissage automatique
Introduction à Ansible Part «Inventaire»
Série: Introduction à cx_Oracle Contents
[Introduction] Comment utiliser open3d
Introduction à Python pour, pendant
Introduction au Deep Learning ~ Rétropropagation ~
Introduction à Ansible Part ④'Variable '