[PYTHON] Après avoir implémenté l'application Watson IoT Platform avec Flask, j'étais accro à la connexion MQTT

Voici un résumé de ce à quoi j'étais accro à la mise en œuvre des applications Watson IoT Platform avec Flask. Parce que je ne comprenais pas très bien Flask, j'ai regretté de devoir le comprendre et l'utiliser correctement.

environnement

Plusieurs interfaces de programmation sont fournies pour les applications Watson IoT Platform, mais cette fois, j'ai créé une application qui s'abonne à Device Status qui rapporte l'état de connexion des appareils IoT à l'aide de MQTT.

Finalement, j'ai voulu pousser le code vers IBM Cloud Foundry, alors j'ai décidé de l'implémenter dans Flask, mais je devrais avoir une solide compréhension de Flask ici ...

Python 2.7 paho-mqtt 1.5.0 Flask 1.1.2

Préparation

Création d'une application avec IBM Cloud Foundry

Créez une application publique en vous référant à l'article suivant. https://cloud.ibm.com/docs/cloud-foundry-public?topic=cloud-foundry-public-getting-started

Obtenez l'exemple de code qui sort dans les étapes ci-dessus à partir de git et modifiez-le afin qu'il puisse répondre aux requêtes HTTP pour le moment.

hello.py


from flask import Flask
import os

app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))

@app.route('/')
def root():
    return 'Hello, world'

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=port, debug=True)

Essayez d'exécuter ce programme localement.

# python hello.py
 * Serving Flask app "hello" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 210-659-291

J'essaierai d'y accéder avec curl pour confirmation. Il semble que vous puissiez répondre avec succès aux requêtes HTTP sur le port 8000.

# curl localhost:8000
Hello, world

Vous disposez désormais d'une application qui peut au moins répondre aux requêtes HTTP.

Création de clés API et de jetons pour Watson IoT Platform

Watson IoT Platform dispose d'un client MQTT pour publier les informations collectées par les appareils IoT (il existe deux méthodes, appelées respectivement appareils et passerelles), et il reçoit et traite les données collectées par les appareils IoT et vérifie l'état des appareils IoT. Deux types de connexion client sont possibles: client MQTT (application) à exploiter et.

Cette fois, connectez-vous en tant qu'application et abonnez-vous à la rubrique pour recevoir une notification des modifications d'état des appareils gérées par Watson IoT Platform. Plus d'informations sur la connexion peuvent être trouvées sur le lien ci-dessous. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html

Pour commencer, créez une clé API et un jeton pour établir une connexion MQTT à Watson IoT Platform en tant qu'application. Veuillez vous référer au lien ci-dessous pour continuer. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/app_dev_index.html

En établissant une connexion MQTT à l'aide de la clé API et du jeton créés, vous devriez pouvoir être informé des changements d'état de l'appareil chaque fois qu'un appareil IoT se connecte ou se déconnecte de Watson IoT Platform.

J'ai implémenté l'application ...

Ajoutez le code pour vous connecter à Watson IoT Platform avec la clé API et le jeton créés à hello.py à l'aide de Flask.

Pour établir une connexion MQTT à Watson IoT Platform, vous devez spécifier l'ID client, l'ID utilisateur et le mot de passe. Créez en concaténant l'ID d'organisation, la clé API et le jeton attribués par Watson IoT Platform et chacun dans le format spécifié. Ce format est décrit en détail dans l'authentification MQTT sur le lien ci-dessous. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html

Dans l'exemple de code ci-dessous, l'ID de l'organisation est "oooooo ", la clé API est" kkkkkkkkkk "et le jeton est "tttttttttttttttttt", mais vous devez le remplacer par la valeur fournie par Watson IoT Platform pour exécuter le code.

hello.py


from flask import Flask
import os

import paho.mqtt.client as mqtt
from datetime import datetime

def on_connect(client, userdata, flags, respons_code):
    client.on_message = on_message
    client.subscribe('iot-2/type/+/id/+/mon')
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")

def on_disconnect(client, userdata, rc):
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")

def on_message(client, userdata, msg):
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt message arrived")

client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
client.loop_start()

app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))

@app.route('/')
def root():
    return "Hello, world"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=port, debug=True)

J'ai décidé de préparer un tel code et de vérifier le fonctionnement. L'espoir est que vous serez en mesure de confirmer la réception du message MQTT tout en répondant aux requêtes HTTP du port 8000.

Mais quand vous exécutez réellement le code ...

# python hello.py
 * Serving Flask app "hello" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
 * Restarting with stat
2020/05/13 18:13:26: mqtt connected
 * Debugger is active!
 * Debugger PIN: 210-659-291
2020/05/13 18:13:26: mqtt message arrived
2020/05/13 18:13:27: mqtt disconnected
2020/05/13 18:13:27: mqtt connected
2020/05/13 18:13:27: mqtt message arrived
2020/05/13 18:13:28: mqtt connected
2020/05/13 18:13:28: mqtt disconnected
2020/05/13 18:13:29: mqtt message arrived
2020/05/13 18:13:30: mqtt disconnected
2020/05/13 18:13:30: mqtt connected
2020/05/13 18:13:30: mqtt message arrived
2020/05/13 18:13:31: mqtt disconnected
2020/05/13 18:13:31: mqtt connected

De cette manière, bien que la connexion avec MQTT ait été établie une fois, elle a été immédiatement coupée. Après cela, paho-mqtt se reconnecte automatiquement en raison d'une déconnexion inattendue de la session, mais il se déconnecte également immédiatement, puis se reconnecte et se déconnecte à plusieurs reprises.

Si la clé ou le jeton API est incorrect, la connexion MQTT devrait échouer en premier lieu, et j'ai essayé d'ajuster l'intervalle des messages pour voir si le keepalive MQTT avait expiré, mais cela n'a pas non plus conduit à une amélioration. C'était.

Je me suis demandé ce qui n'allait pas avec le chemin de communication et je l'ai laissé pendant quelques jours, mais les symptômes n'ont pas changé.

Solution

Lorsque j'étais perdu et que j'avais posté une question sur stackoverflow, deux personnes m'ont informé que l'ID client pouvait être dupliqué.

Vous ne pouvez certainement pas vous connecter à MQTT Broker avec le même identifiant client. Je pensais que c'était vrai, mais je ne vois aucune raison à la duplication. hello.py n'exécute qu'un seul processus à la fois. En outre, la règle est que l'ID client est généré en concaténant l'ID d'organisation et la clé API, mais l'ID d'organisation ne détourne pas la clé API à d'autres fins. Pour une raison quelconque, j'ai pensé que les anciennes informations de connexion pouvaient rester sur Watson IoT Platform et je ne pouvais pas me connecter avec elle.J'ai donc essayé de générer une nouvelle clé API, mais le symptôme ne change pas même si je l'utilise. fait.

Apparemment, j'abandonnais de penser que l'ID client en double n'était pas la cause, mais quand j'ai regardé à nouveau la sortie de hello.py, j'ai vu la chaîne "Redémarrer avec stat".

Au fait, qu'est-ce que c'est?

Jusqu'à présent, je me méfiais de la partie MQTT, mais j'ai décidé d'enquêter sur ce mystérieux message de Flask.

résultat,

--Lorsque Flask est exécuté avec le mode de débogage activé, il existe un mécanisme pour détecter les changements dans le code qui compose l'application et redémarrer automatiquement l'application.

J'en suis venu à comprendre cela.

Quand je le vérifie,

# ps -f | grep hello.py
  501 20745  2576   0 10:39PM ttys005    0:00.35 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python hello.py
  501 20748 20745   0 10:39PM ttys005    0:00.36 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/(réduction)/hello.py

Il existe certainement deux processus hello.py en cours d'exécution dans une relation parent-enfant.

Dans cette situation, l'ID client en double est convaincant. Le processus parent (pid: 20745) et le processus enfant (pid: 20748) utilisent le même ID client à peu près au même moment pour entrer dans la connexion MQTT. Bien, je vois.

Alors, comment éviter les identifiants clients en double? Il est facile de désactiver le mode de débogage, mais c'est dommage que vous ne puissiez pas utiliser les fonctionnalités utiles fournies. Une enquête plus approfondie a révélé que le processus parent était "WEERKZUG_RUN_MAIN" avant d'appeler le processus enfant. Cela semble être correct si vous vérifiez cette variable d'environnement et ne faites pas de connexion MQTT dans le processus parent.

Correction de code et contrôle de fonctionnement

Basé sur la compréhension ci-dessus du mode de débogage

--Vérifiez la variable d'environnement "WEERKZUG_RUN_MAIN" avant de connecter MQTT, et connectez-vous uniquement si cette variable d'environnement est définie. --Déconnectez MQTT avant de quitter le processus et attendez la fin de la déconnexion.

J'ai à nouveau modifié hello.py comme ça. Le deuxième correctif est que lorsque le processus parent détecte un changement de code et le recharge, il arrête le processus enfant 1 et démarre un nouveau processus enfant 2, mais n'attend pas que la connexion MQTT du processus enfant 1 soit déconnectée. La même chose semble se produire lorsque le processus enfant 2 est lancé, j'ai donc décidé de l'ajouter juste au cas où.

hello.py


from flask import Flask
import os
import threading
import json

import paho.mqtt.client as mqtt
from datetime import datetime

def on_connect(client, userdata, flags, respons_code):
    client.on_message = on_message
    client.subscribe('iot-2/type/+/id/+/mon')
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")

cond = threading.Condition()
notified = False

def on_disconnect(client, userdata, rc):
    global notified
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")
    with cond:
        notified = True
        cond.notify()

status = 'Unknown'

def on_message(client, userdata, msg):
    global status
    response_json = msg.payload.decode("utf-8")
    response_dict = json.loads(response_json)
    if(response_dict["ClientID"] == 'Spécifiez ici l'ID client de la passerelle d'appareil que vous souhaitez suivre'):
        if( response_dict["Action"] == "Disconnect" ):
            status = "Disconnected"
        elif( response_dict["Action"] == "Connect" ):
            status = "Connected"

if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
    client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
    client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
    client.loop_start()

app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))

@app.route('/')
def root():
    return "Status: " + status

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=port, debug=True)

    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        client.loop_stop()
        client.disconnect()
        with cond:
            if( not notified ):
                cond.wait()

J'ai modifié le code comme ci-dessus et essayé de l'exécuter à nouveau.

# python hello.py
 * Serving Flask app "hello" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 210-659-291
2020/05/13 23:04:28: mqtt connected
127.0.0.1 - - [13/May/2020 23:04:38] "GET / HTTP/1.1" 200 -

La connexion / déconnexion répétée s'est arrêtée!

Quand je fais une requête HTTP avec curl,

# curl localhost:8000
Status: Disconnected

Il semble que le traitement des messages MQTT fonctionne également bien.

Essayez de connecter un appareil IoT enregistré à titre d'essai.

Cette fois, nous utiliserons mosquitto_sub pour nous abonner à la commande Device pour remplacer l'appareil IoT. Pour plus de détails sur la configuration et la connexion, consultez l'article lié ci-dessous. https://qiita.com/kuraoka/items/5380f6b5e97e8cd1ad98

# mosquitto_sub -h de.messaging.internetofthings.ibmcloud.com -u use-token-auth -P "tttttttttttttttttt" -i (hello.Identifiant client surveillé par py) -t 'iot-2/type/(type)/id/(id)/cmd/control/fmt/json'

Avec la connexion MQTT de l'appareil IoT comme décrit ci-dessus, essayez à nouveau de faire une requête HTTP avec curl.

# curl localhost:8000
Status: Connected

Cette fois, le statut Connecté est renvoyé. Par connexion MQTT avec mosquitto_sub, il semble que le message d'état du périphérique a été envoyé à hello.py et que la variable d'état a été modifiée dans la fonction on_message ().

Tout cela est comme prévu.

Résumé

J'ai essayé d'utiliser Flask pour la première fois cette fois. Lorsque j'ai essayé de créer le plus petit code à partir de l'exemple de code et d'ajouter la partie que je voulais implémenter à partir de là, j'ai été pris dans un endroit inattendu. Il y a aussi l'idée que j'ai étudiée pour ce genre d'essais et d'erreurs, mais il semble que j'aurais dû l'étudier d'abord et ensuite l'utiliser.

Watson IoT Platform est un service cloud puissant permettant de gérer et de contrôler les appareils IoT et de collecter des données.

La partie qui télécharge les données collectées par l'appareil IoT dans le cloud est la base du système, mais je souhaite implémenter une application qui surveille l'état de l'appareil IoT et le contrôle si nécessaire sur le cloud. J'y ai réfléchi et j'ai pensé à cette étude. Maintenant que nous avons confirmé le fonctionnement minimum de la partie de surveillance de l'état, nous souhaitons étendre notre portée pour étudier le contrôle des appareils IoT en envoyant des commandes Device.

Recommended Posts

Après avoir implémenté l'application Watson IoT Platform avec Flask, j'étais accro à la connexion MQTT
J'étais accro à essayer logging.getLogger avec Flask 1.1.x
J'étais accro à Flask sur dotCloud
Ce à quoi j'étais accro lors de la création d'applications Web dans un environnement Windows
J'étais accro au grattage avec Selenium (+ Python) en 2020
Ce à quoi j'étais accro avec json.dumps dans l'encodage base64 de Python
J'étais accro aux variables de classe et aux variables d'instance erronées en Python
J'étais accro au multitraitement + psycopg2
Le nom du fichier était mauvais en Python et j'étais accro à l'importation
Ce que j'étais accro à Python autorun
[Introduction à json] Non, j'en étais accro. .. .. ♬
Python: j'ai pu récurer en lambda
J'étais sobrement accro à appeler awscli à partir d'un script Python 2.7 enregistré dans crontab
Notez que j'étais accro au script npm ne passant pas dans l'environnement de vérification
Ce à quoi j'étais accro en combinant l'héritage de classe et l'héritage de table commune dans SQLAlchemy
Je veux faire la transition avec un bouton sur le ballon
Une histoire à laquelle j'étais accro chez np.where
Python: peut être répété en lambda
Ce à quoi j'étais accro lors de l'utilisation de Python tornado
Quand j'ai essayé d'installer PIL et matplotlib dans un environnement virtualenv, j'en étais accro.
Ce à quoi j'étais accro en traitant d'énormes fichiers dans un environnement Linux 32 bits
Une histoire à laquelle j'étais accro en spécifiant nil comme argument de fonction dans Go