[PYTHON] After implementing Watson IoT Platform application in Flask, I was addicted to MQTT connection

Here's a summary of what I was addicted to trying to implement Watson IoT Platform applications in Flask. Because I didn't understand Flask very well, I regretted that I had to understand and use it properly.

environment

Several programming interfaces are provided for Watson IoT Platform applications, but this time I created an application that subscribes to Device Status that reports the connection status of IoT devices using MQTT.

Eventually I wanted to push the code to IBM Cloud Foundry so I decided to implement it in Flask, but I should have a solid understanding of Flask here ...

Python 2.7 paho-mqtt 1.5.0 Flask 1.1.2

Preparation

Creating applications with IBM Cloud Foundry

Create a public application by referring to the following article. https://cloud.ibm.com/docs/cloud-foundry-public?topic=cloud-foundry-public-getting-started

Get the sample code that comes out in the above steps from git and modify it so that it can respond to HTTP requests for the time being.

hello.py


from flask import Flask
import os

app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))

@app.route('/')
def root():
    return 'Hello, world'

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=port, debug=True)

Try running this program locally.

# python hello.py
 * Serving Flask app "hello" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 210-659-291

I will try to access it with curl for confirmation. It seems that you can successfully respond to HTTP requests on port 8000.

# curl localhost:8000
Hello, world

You now have an application that can at least respond to HTTP requests.

Creating API Keys and Tokens for Watson IoT Platform

Watson IoT Platform has an MQTT client for publishing information collected by IoT devices (there are two methods, called devices and gateways, respectively), and it receives and processes the data collected by IoT devices and checks the status of IoT devices. Two types of client connection are possible: MQTT client (application) to operate and.

This time, connect as an application and subscribe to the topic to receive notification of device status changes managed by Watson IoT Platform. More information on the connection can be found at the link below. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html

To get started, create an API key and token to make an MQTT connection to Watson IoT Platform as an application. Please refer to the link below to proceed. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/app_dev_index.html

By making an MQTT connection using the created API key and token, you should be able to be notified of Device Status changes each time an IoT device connects to or disconnects from the Watson IoT Platform.

I implemented the application ...

Add the code to connect to Watson IoT Platform with the created API key and token to hello.py using Flask.

In order to make an MQTT connection to Watson IoT Platform, you need to specify the client id, userid, and password. Create by concatenating the organization id, API key and token assigned by Watson IoT Platform and each in the specified format. This format is described in detail in MQTT authentication at the link below. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html

In the sample code below, the organization id is'oooooo', the API key is'kkkkkkkkkk', and the token is'tttttttttttttttttt', but you need to change it to the value provided by Watson IoT Platform when you run the code.

hello.py


from flask import Flask
import os

import paho.mqtt.client as mqtt
from datetime import datetime

def on_connect(client, userdata, flags, respons_code):
    client.on_message = on_message
    client.subscribe('iot-2/type/+/id/+/mon')
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")

def on_disconnect(client, userdata, rc):
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")

def on_message(client, userdata, msg):
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt message arrived")

client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
client.loop_start()

app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))

@app.route('/')
def root():
    return "Hello, world"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=port, debug=True)

I decided to prepare such a code and check the operation. The expectation is that you will be able to confirm receipt of MQTT messages while responding to HTTP requests from port 8000.

But when you actually run the code ...

# python hello.py
 * Serving Flask app "hello" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
 * Restarting with stat
2020/05/13 18:13:26: mqtt connected
 * Debugger is active!
 * Debugger PIN: 210-659-291
2020/05/13 18:13:26: mqtt message arrived
2020/05/13 18:13:27: mqtt disconnected
2020/05/13 18:13:27: mqtt connected
2020/05/13 18:13:27: mqtt message arrived
2020/05/13 18:13:28: mqtt connected
2020/05/13 18:13:28: mqtt disconnected
2020/05/13 18:13:29: mqtt message arrived
2020/05/13 18:13:30: mqtt disconnected
2020/05/13 18:13:30: mqtt connected
2020/05/13 18:13:30: mqtt message arrived
2020/05/13 18:13:31: mqtt disconnected
2020/05/13 18:13:31: mqtt connected

In this way, although the connection with MQTT was established once, it was cut off immediately. After that, paho-mqtt receives an unexpected session disconnection and automatically reconnects, but it also disconnects immediately, and then reconnects and disconnects repeatedly.

If the API key or token is wrong, MQTT connection should fail in the first place, and I tried adjusting the message interval to see if MQTT keepalive timed out, but this also does not lead to improvement. It was.

I wondered what was wrong with the communication path and left it for a few days, but the symptoms did not change.

Solution

I was at a loss and posted a question on stackoverflow, and two people advised me that the client id might be duplicated.

Certainly you cannot connect to MQTT Broker with the same client id. I thought it was true, but I can't think of any reason for the duplication. hello.py is running only one process at a time. Also, the rule is that the client id is generated by concatenating the organization id and the API key, but the organization id does not divert the API key for other purposes. For some reason, I thought that old connection information might remain on Watson IoT Platform and I could not connect with it, so I tried to generate a new API key, but the symptom does not change even if I use it. did.

Apparently I was giving up thinking that the duplicate client id wasn't the cause, but when I looked at the output of hello.py again, I saw the string'Restarting with stat'.

By the way, what is this?

I've been suspicious of the MQTT part so far, but I decided to investigate this mysterious message of Flask.

result,

--When Flask is run with debug mode enabled, it detects changes in the code that makes up the application and automatically restarts the application. --At this time, Flask implicitly starts the child process, the parent process is in charge of monitoring code changes and restarting the child process, and the child process processes HTTP requests as a WEB application.

I have come to understand that.

When I check it,

# ps -f | grep hello.py
  501 20745  2576   0 10:39PM ttys005    0:00.35 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python hello.py
  501 20748 20745   0 10:39PM ttys005    0:00.36 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/(abridgement)/hello.py

There are certainly two parent-child hello.py processes running.

In this situation, the duplicate cllient id is convincing. The parent process (pid: 20745) and the child process (pid: 20748) use the same client id at about the same time to enter the MQTT connection. Well, I see.

So how do you avoid duplicate client ids? It's easy to disable debug mode, but it's a shame that you can't use the useful features that are available. Further investigation revealed that the parent process was'WEERKZUG_RUN_MAIN' before invoking the child process. It seems that it is okay if you check this environment variable and do not make MQTT connection in the parent process.

Code correction and operation check

Based on the above understanding of debug mode

--Check the environment variable'WEERKZUG_RUN_MAIN' before connecting MQTT, and connect only if this environment variable is defined. --Disconnect MQTT before the end of the process and wait for the disconnection to complete. --In addition, add on_message () processing to track the status of the desired device and report the status in response to HTTP requests.

I modified hello.py again like this. The second fix is that when the parent process detects a code change and reloads, it stops the child process 1 and starts a new child process 2, but does not wait for the child process 1 to disconnect the MQTT connection. The same thing will happen if child process 2 is started, so I decided to add it just in case.

hello.py


from flask import Flask
import os
import threading
import json

import paho.mqtt.client as mqtt
from datetime import datetime

def on_connect(client, userdata, flags, respons_code):
    client.on_message = on_message
    client.subscribe('iot-2/type/+/id/+/mon')
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")

cond = threading.Condition()
notified = False

def on_disconnect(client, userdata, rc):
    global notified
    print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")
    with cond:
        notified = True
        cond.notify()

status = 'Unknown'

def on_message(client, userdata, msg):
    global status
    response_json = msg.payload.decode("utf-8")
    response_dict = json.loads(response_json)
    if(response_dict["ClientID"] == 'Specify the client id of the device gateway you want to track here'):
        if( response_dict["Action"] == "Disconnect" ):
            status = "Disconnected"
        elif( response_dict["Action"] == "Connect" ):
            status = "Connected"

if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
    client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
    client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
    client.loop_start()

app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))

@app.route('/')
def root():
    return "Status: " + status

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=port, debug=True)

    if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
        client.loop_stop()
        client.disconnect()
        with cond:
            if( not notified ):
                cond.wait()

I modified the code as above and tried running it again.

# python hello.py
 * Serving Flask app "hello" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 210-659-291
2020/05/13 23:04:28: mqtt connected
127.0.0.1 - - [13/May/2020 23:04:38] "GET / HTTP/1.1" 200 -

Repeated connection / disconnection has stopped!

When I make an HTTP request with curl,

# curl localhost:8000
Status: Disconnected

It seems that the processing of MQTT messages is also working well.

Try connecting the registered IoT device.

This time, we will use mosquitto_sub to subscribe to the Device command to replace the IoT device. For details on how to set up and connect, see the article linked below. https://qiita.com/kuraoka/items/5380f6b5e97e8cd1ad98

# mosquitto_sub -h de.messaging.internetofthings.ibmcloud.com -u use-token-auth -P "tttttttttttttttttt" -i (hello.Client id monitored by py) -t 'iot-2/type/(type)/id/(id)/cmd/control/fmt/json'

With the MQTT connection from the IoT device as described above, try making an HTTP request with curl again.

# curl localhost:8000
Status: Connected

This time the Connected status is returned. By MQTT connection with mosquitto_sub, it seems that the Device status message was sent to hello.py and the status variable was changed in the on_message () function.

This is all as expected.

Summary

I tried using Flask for the first time this time. When I tried to create the minimum code from the sample code and add the part I wanted to implement from there, it seemed that my foot was scooped up unexpectedly. There is also the idea that I studied for this kind of trial and error, but it seems that I should have studied it first and then used it.

Watson IoT Platform is a powerful cloud service for managing and controlling IoT devices and collecting data.

The part that uploads the data collected by the IoT device to the cloud is the basis of the system, but I want to implement an application that monitors the status of the IoT device and controls it as needed on the cloud. I thought about it, and thought of this study. Now that we have confirmed the minimum operation of the status monitoring part, we would like to expand our reach to investigate the control of IoT devices by sending Device commands.

Recommended Posts

After implementing Watson IoT Platform application in Flask, I was addicted to MQTT connection
I was addicted to trying logging.getLogger in Flask 1.1.x
I was addicted to Flask on dotCloud
What I was addicted to when creating a web application in a windows environment
I was addicted to scraping with Selenium (+ Python) in 2020
What I was addicted to with json.dumps in Python base64 encoding
I was addicted to confusing class variables and instance variables in Python
I was addicted to multiprocessing + psycopg2
The file name was bad in Python and I was addicted to import
What I was addicted to Python autorun
[Introduction to json] No, I was addicted to it. .. .. ♬
I was able to recurse in Python: lambda
I was soberly addicted to calling awscli from a Python 2.7 script registered in crontab
Note that I was addicted to npm script not passing in the verification environment
What I was addicted to when combining class inheritance and Joint Table Inheritance in SQLAlchemy
I want to transition with a button in flask
A story that I was addicted to at np.where
I was able to repeat it in Python: lambda
What I was addicted to when using Python tornado
When I tried to install PIL and matplotlib in a virtualenv environment, I was addicted to it.
What I was addicted to when dealing with huge files in a Linux 32bit environment
The story I was addicted to when I specified nil as a function argument in Go