[PYTHON] Lorsqu'un mot-clé apparaît dans une rubrique à laquelle vous êtes abonné par MQTT, publiez-le dans une autre rubrique

【Aperçu】

J'ai écrit ce processus plusieurs fois, et finalement j'ai eu l'idée de le réutiliser. L'image ressemble à la figure ci-dessous, avec S'abonner pour le sujet A sur la gauche et Publier pour le sujet B sur la droite.

Kobito.jocseL.png

Dans le script, le sujet de gauche est exprimé sous la forme «SUB_TOPIC» et le sujet de droite est exprimé sous la forme «PUB_TOPIC». À propos, l'hôte peut être différent du côté SUB et du côté PUB, j'ai donc essayé d'avoir les informations de connexion du côté SUB et du côté PUB respectivement.

【scénario】

Comme ça.

sample.py


#!/usr/bin/python
# encoding:utf-8

import paho.mqtt.client as mqtt
import re
import sys

#Paramètres liés au sujet auxquels ce script s'abonne
SUB_TOPIC = "subtopic"
SUB_USERNAME = 'username'
SUB_PASSWORD = 'password'
SUB_HOST = 'localhost'
SUB_PORT = 1883

#Paramètres liés au sujet que ce script publie
PUB_TOPIC = "pubtopic"
PUB_USERNAME = 'username'
PUB_PASSWORD = 'password'
PUB_HOST = 'localhost'
PUB_PORT = 1883

#Lorsque ce mot clé apparaît dans la rubrique secondaire Sub, sortie dans la rubrique latérale Pub
KEYWORD = "TEST"

#Drapeaux pour le contrôle de sortie de débogage
ISDEBUG = True

def sub_on_connect(client, userdata, flags, rc):
    client.subscribe(SUB_TOPIC)

def sub_on_message(client, userdata, msg):
    #Le message reçu est msg.Stocké dans la charge utile
    debug_echo(msg.payload)

    match = re.match(KEYWORD, msg.payload)

    if match is None:
        debug_echo("Not Match")
    else:
        debug_echo("Match")
        publish("OK")


def publish(msg):
    pub_client.publish(PUB_TOPIC, msg)
    pass


def debug_echo(msg):
    if ISDEBUG :
        print msg
        sys.stderr.write(msg+"\n")

if __name__ == '__main__':
    #Préparation du client expéditeur
    pub_client = mqtt.Client(protocol=mqtt.MQTTv311)
    pub_client.username_pw_set(PUB_USERNAME, password=PUB_PASSWORD)
    pub_client.connect(PUB_HOST, port=PUB_PORT, keepalive=60)

    #Préparation du client bénéficiaire
    sub_client = mqtt.Client(protocol=mqtt.MQTTv311)
    sub_client.on_connect = sub_on_connect
    sub_client.on_message = sub_on_message
    sub_client.username_pw_set(SUB_USERNAME, password=SUB_PASSWORD)
    sub_client.connect(SUB_HOST, port=SUB_PORT, keepalive=60)

    #Démarrer la boucle de réception
    sub_client.loop_forever()

Addendum: modifié pour afficher le contenu de débogage dans la sortie d'erreur standard.

Recommended Posts

Lorsqu'un mot-clé apparaît dans une rubrique à laquelle vous êtes abonné par MQTT, publiez-le dans une autre rubrique
Que faire lorsqu'un avertissement apparaît autour de l'intégration de Python dans CheckHealth de Neovim
[openpyxl] Que faire lorsque IllegalCharacterError apparaît dans pandas.DataFrame.to_excel
Choses à noter lors de l'initialisation d'une liste en Python
Lorsque vous voulez plt.save dans l'instruction for
Publier / télécharger une bibliothèque créée en Python vers PyPI
Que faire lorsque "SSL: CERTIFICATE_VERIFY_FAILED _ssl.c: 1056" apparaît en Python
Que faire lorsque "En-tête HTTP_HOST non valide" apparaît dans Django
[Sous-processus] Lorsque vous souhaitez exécuter un autre programme Python en code Python
Spécification du fuseau horaire lors de la conversion d'une chaîne de caractères en type datetime avec python
[Python] Lorsque vous souhaitez utiliser toutes les variables dans un autre fichier
Lors de la création d'une matrice dans une liste
Lors de l'écriture d'un programme en Python
[Mémorandum] Que faire lorsqu'un avertissement apparaît après l'exécution de pip list
Comment écrire une chaîne de caractères lorsqu'il y a plusieurs lignes en python
Que faire lorsqu'un message d'avertissement est affiché dans la liste des pip