Here's a summary of what I was addicted to trying to implement Watson IoT Platform applications in Flask. Because I didn't understand Flask very well, I regretted that I had to understand and use it properly.
Several programming interfaces are provided for Watson IoT Platform applications, but this time I created an application that subscribes to Device Status that reports the connection status of IoT devices using MQTT.
Eventually I wanted to push the code to IBM Cloud Foundry so I decided to implement it in Flask, but I should have a solid understanding of Flask here ...
Python 2.7 paho-mqtt 1.5.0 Flask 1.1.2
Create a public application by referring to the following article. https://cloud.ibm.com/docs/cloud-foundry-public?topic=cloud-foundry-public-getting-started
Get the sample code that comes out in the above steps from git and modify it so that it can respond to HTTP requests for the time being.
hello.py
from flask import Flask
import os
app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))
@app.route('/')
def root():
return 'Hello, world'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=port, debug=True)
Try running this program locally.
# python hello.py
* Serving Flask app "hello" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 210-659-291
I will try to access it with curl for confirmation. It seems that you can successfully respond to HTTP requests on port 8000.
# curl localhost:8000
Hello, world
You now have an application that can at least respond to HTTP requests.
Watson IoT Platform has an MQTT client for publishing information collected by IoT devices (there are two methods, called devices and gateways, respectively), and it receives and processes the data collected by IoT devices and checks the status of IoT devices. Two types of client connection are possible: MQTT client (application) to operate and.
This time, connect as an application and subscribe to the topic to receive notification of device status changes managed by Watson IoT Platform. More information on the connection can be found at the link below. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html
To get started, create an API key and token to make an MQTT connection to Watson IoT Platform as an application. Please refer to the link below to proceed. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/app_dev_index.html
By making an MQTT connection using the created API key and token, you should be able to be notified of Device Status changes each time an IoT device connects to or disconnects from the Watson IoT Platform.
Add the code to connect to Watson IoT Platform with the created API key and token to hello.py using Flask.
In order to make an MQTT connection to Watson IoT Platform, you need to specify the client id, userid, and password. Create by concatenating the organization id, API key and token assigned by Watson IoT Platform and each in the specified format. This format is described in detail in MQTT authentication at the link below. https://www.ibm.com/support/knowledgecenter/en/SSQP8H/iot/platform/applications/mqtt.html
In the sample code below, the organization id is'oooooo', the API key is'kkkkkkkkkk', and the token is'tttttttttttttttttt', but you need to change it to the value provided by Watson IoT Platform when you run the code.
hello.py
from flask import Flask
import os
import paho.mqtt.client as mqtt
from datetime import datetime
def on_connect(client, userdata, flags, respons_code):
client.on_message = on_message
client.subscribe('iot-2/type/+/id/+/mon')
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")
def on_disconnect(client, userdata, rc):
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")
def on_message(client, userdata, msg):
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt message arrived")
client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
client.loop_start()
app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))
@app.route('/')
def root():
return "Hello, world"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=port, debug=True)
I decided to prepare such a code and check the operation. The expectation is that you will be able to confirm receipt of MQTT messages while responding to HTTP requests from port 8000.
But when you actually run the code ...
# python hello.py
* Serving Flask app "hello" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
* Restarting with stat
2020/05/13 18:13:26: mqtt connected
* Debugger is active!
* Debugger PIN: 210-659-291
2020/05/13 18:13:26: mqtt message arrived
2020/05/13 18:13:27: mqtt disconnected
2020/05/13 18:13:27: mqtt connected
2020/05/13 18:13:27: mqtt message arrived
2020/05/13 18:13:28: mqtt connected
2020/05/13 18:13:28: mqtt disconnected
2020/05/13 18:13:29: mqtt message arrived
2020/05/13 18:13:30: mqtt disconnected
2020/05/13 18:13:30: mqtt connected
2020/05/13 18:13:30: mqtt message arrived
2020/05/13 18:13:31: mqtt disconnected
2020/05/13 18:13:31: mqtt connected
In this way, although the connection with MQTT was established once, it was cut off immediately. After that, paho-mqtt receives an unexpected session disconnection and automatically reconnects, but it also disconnects immediately, and then reconnects and disconnects repeatedly.
If the API key or token is wrong, MQTT connection should fail in the first place, and I tried adjusting the message interval to see if MQTT keepalive timed out, but this also does not lead to improvement. It was.
I wondered what was wrong with the communication path and left it for a few days, but the symptoms did not change.
I was at a loss and posted a question on stackoverflow, and two people advised me that the client id might be duplicated.
Certainly you cannot connect to MQTT Broker with the same client id. I thought it was true, but I can't think of any reason for the duplication. hello.py is running only one process at a time. Also, the rule is that the client id is generated by concatenating the organization id and the API key, but the organization id does not divert the API key for other purposes. For some reason, I thought that old connection information might remain on Watson IoT Platform and I could not connect with it, so I tried to generate a new API key, but the symptom does not change even if I use it. did.
Apparently I was giving up thinking that the duplicate client id wasn't the cause, but when I looked at the output of hello.py again, I saw the string'Restarting with stat'.
By the way, what is this?
I've been suspicious of the MQTT part so far, but I decided to investigate this mysterious message of Flask.
result,
--When Flask is run with debug mode enabled, it detects changes in the code that makes up the application and automatically restarts the application. --At this time, Flask implicitly starts the child process, the parent process is in charge of monitoring code changes and restarting the child process, and the child process processes HTTP requests as a WEB application.
I have come to understand that.
When I check it,
# ps -f | grep hello.py
501 20745 2576 0 10:39PM ttys005 0:00.35 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python hello.py
501 20748 20745 0 10:39PM ttys005 0:00.36 /System/Library/Frameworks/Python.framework/Versions/2.7/Resources/Python.app/Contents/MacOS/Python /Users/(abridgement)/hello.py
There are certainly two parent-child hello.py processes running.
In this situation, the duplicate cllient id is convincing. The parent process (pid: 20745) and the child process (pid: 20748) use the same client id at about the same time to enter the MQTT connection. Well, I see.
So how do you avoid duplicate client ids? It's easy to disable debug mode, but it's a shame that you can't use the useful features that are available. Further investigation revealed that the parent process was'WEERKZUG_RUN_MAIN' before invoking the child process. It seems that it is okay if you check this environment variable and do not make MQTT connection in the parent process.
Based on the above understanding of debug mode
--Check the environment variable'WEERKZUG_RUN_MAIN' before connecting MQTT, and connect only if this environment variable is defined. --Disconnect MQTT before the end of the process and wait for the disconnection to complete. --In addition, add on_message () processing to track the status of the desired device and report the status in response to HTTP requests.
I modified hello.py again like this. The second fix is that when the parent process detects a code change and reloads, it stops the child process 1 and starts a new child process 2, but does not wait for the child process 1 to disconnect the MQTT connection. The same thing will happen if child process 2 is started, so I decided to add it just in case.
hello.py
from flask import Flask
import os
import threading
import json
import paho.mqtt.client as mqtt
from datetime import datetime
def on_connect(client, userdata, flags, respons_code):
client.on_message = on_message
client.subscribe('iot-2/type/+/id/+/mon')
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt connected")
cond = threading.Condition()
notified = False
def on_disconnect(client, userdata, rc):
global notified
print(datetime.now().strftime("%Y/%m/%d %H:%M:%S") + ": mqtt disconnected")
with cond:
notified = True
cond.notify()
status = 'Unknown'
def on_message(client, userdata, msg):
global status
response_json = msg.payload.decode("utf-8")
response_dict = json.loads(response_json)
if(response_dict["ClientID"] == 'Specify the client id of the device gateway you want to track here'):
if( response_dict["Action"] == "Disconnect" ):
status = "Disconnected"
elif( response_dict["Action"] == "Connect" ):
status = "Connected"
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
client = mqtt.Client(client_id='a:oooooo:appl1', protocol=mqtt.MQTTv311)
client.username_pw_set('a-oooooo-kkkkkkkkkk', password='tttttttttttttttttt')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect('de.messaging.internetofthings.ibmcloud.com', 1883, 120)
client.loop_start()
app = Flask(__name__, static_url_path='')
port = int(os.getenv('PORT', 8000))
@app.route('/')
def root():
return "Status: " + status
if __name__ == '__main__':
app.run(host='0.0.0.0', port=port, debug=True)
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
client.loop_stop()
client.disconnect()
with cond:
if( not notified ):
cond.wait()
I modified the code as above and tried running it again.
# python hello.py
* Serving Flask app "hello" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://0.0.0.0:8000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 210-659-291
2020/05/13 23:04:28: mqtt connected
127.0.0.1 - - [13/May/2020 23:04:38] "GET / HTTP/1.1" 200 -
Repeated connection / disconnection has stopped!
When I make an HTTP request with curl,
# curl localhost:8000
Status: Disconnected
It seems that the processing of MQTT messages is also working well.
Try connecting the registered IoT device.
This time, we will use mosquitto_sub to subscribe to the Device command to replace the IoT device. For details on how to set up and connect, see the article linked below. https://qiita.com/kuraoka/items/5380f6b5e97e8cd1ad98
# mosquitto_sub -h de.messaging.internetofthings.ibmcloud.com -u use-token-auth -P "tttttttttttttttttt" -i (hello.Client id monitored by py) -t 'iot-2/type/(type)/id/(id)/cmd/control/fmt/json'
With the MQTT connection from the IoT device as described above, try making an HTTP request with curl again.
# curl localhost:8000
Status: Connected
This time the Connected status is returned. By MQTT connection with mosquitto_sub, it seems that the Device status message was sent to hello.py and the status variable was changed in the on_message () function.
This is all as expected.
I tried using Flask for the first time this time. When I tried to create the minimum code from the sample code and add the part I wanted to implement from there, it seemed that my foot was scooped up unexpectedly. There is also the idea that I studied for this kind of trial and error, but it seems that I should have studied it first and then used it.
Watson IoT Platform is a powerful cloud service for managing and controlling IoT devices and collecting data.
The part that uploads the data collected by the IoT device to the cloud is the basis of the system, but I want to implement an application that monitors the status of the IoT device and controls it as needed on the cloud. I thought about it, and thought of this study. Now that we have confirmed the minimum operation of the status monitoring part, we would like to expand our reach to investigate the control of IoT devices by sending Device commands.
Recommended Posts