--I want to measure time-series environmental data using SensorTag --I'm playing one Raspberry Pi3, so I want to use it --I want to try some AWS PaaS - DynamoDB、Lambda、API Gateway --I want to monitor and operate with Slack (I don't want to write UI / UX)
→ As a result, we have created something that is working, so I will summarize it.
The figure was created at www.draw.io. Super convenient.
I used TI's SensorTag CC2650stk. It has many sensors such as acceleration, temperature, humidity, illuminance, gyro, geomagnetism, and barometric pressure.
The SensorTag is basically battery powered. However, if you move the sensor a lot, the power consumption will be high. This time, I wanted to get all the sensor information in chronological order. Attach Debugging Platform to SensorTag, connect it to RaspberryPi3 via USB, and connect it. Power was supplied by a USB cable. This allows you to measure for a long time without worrying about the battery.
I referred to this site. http://kinokotimes.com/2017/03/07/usb-control-method-by-raspberry-pi/ [BeagleBoneBlackBox_USB Power Control](http://www.si-linux.co.jp/techinfo/index.php?BeagleBoneBlackBox_USB%E9%9B%BB%E6%BA%90%E5%88%B6%E5%BE% A1)
I collect sensor data with the app on the Raspberry Pi side. Occasionally the app crashes and is restarting. At that time, the power supply of SensorTag is also restarted just in case. Whether it is appropriate or not, this will make the startup state uniform.
usbrefresh.sh
#! /bin/sh
hub-ctrl -h 0 -P 2 -p 0
sleep 1
hub-ctrl -h 0 -P 2 -p 1
I referred to this site. http://taku-make.blogspot.jp/2015/02/blesensortag.html http://dev.classmethod.jp/hardware/raspberrypi/sensortag-raspberry-pi-2-script/ There were various BLE-related libraries for collecting information from sensor tags. bluepy worked the easiest, so now.
I used the AWS SDK Sample Code as it is. Apache 2.0 license. We have made changes such as the part to be acquired by BLE and the part to create JSON. Basically, I think the code is based on the assumption that communication will be performed using X.509 authentication information.
sendMQTT.py
'''
/*
* Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
'''
###-----------------------------------------------------------------------------
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import sys
import logging
import time
import argparse
from btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers
import struct
import math
from sensortag import SensorTag, KeypressDelegate
import json
from datetime import datetime
###-----------------------------------------------------------------------------
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("--------------")
print("Received : " + message.payload)
print("from topic: " + message.topic)
print("--------------\n\n")
###-----------------------------------------------------------------------------
# Read in command-line parameters
parser = argparse.ArgumentParser()
### AWS!
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", dest="rootCAPath", default="root-CA.crt", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", default="certificate.pem.crt",help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", default="private.pem.key",help="Private key file path")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="Raspi_1", help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="etl/room", help="Targeted topic")
### SensorTag!
parser.add_argument('-n', action='store', dest='count', default=0,
type=int, help="Number of times to loop data")
parser.add_argument('-T',action='store', dest="sleeptime", type=float, default=5.0, help='time between polling')
parser.add_argument('-H', action='store', dest="taghost", help='MAC of BT device')
parser.add_argument('--all', action='store_true', default=True)
args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic
sleeptime = args.sleeptime
deviceID = args.clientId
###=============================================================================
if args.useWebsocket and args.certificatePath and args.privateKeyPath:
parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
exit(2)
if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
parser.error("Missing credentials for authentication.")
exit(2)
###=============================================================================
# Enabling selected sensors
print('Connecting to ' + args.taghost)
tag = SensorTag(args.taghost)
if args.all:
tag.IRtemperature.enable()
tag.humidity.enable()
tag.barometer.enable()
tag.accelerometer.enable()
tag.magnetometer.enable()
tag.gyroscope.enable()
tag.battery.enable()
tag.keypress.enable()
tag.setDelegate(KeypressDelegate())
tag.lightmeter.enable()
time.sleep(1.0)
###=============================================================================
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
###=============================================================================
# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
myAWSIoTMQTTClient.configureEndpoint(host, 443)
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, 8883)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(0.5) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
#myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)
###=============================================================================
# Publish to the same topic in a loop forever
loopCount = 0
Payload = {}
while True:
Payload['ID'] = str(deviceID)
ambient_temp, target_temparature = tag.IRtemperature.read()
Payload["AmbientTemp"] = ambient_temp
Payload["TargetTemp"] = target_temparature
ambient_temp, rel_humidity = tag.humidity.read()
Payload["Humidity"] = rel_humidity
ambient_temp, pressure_millibars = tag.barometer.read()
Payload["Barometer"] = pressure_millibars
Acc_x, Acc_y, Acc_z = tag.accelerometer.read()
Payload["AccX"] = Acc_x
Payload["AccY"] = Acc_y
Payload["AccZ"] = Acc_z
magnet_x, magnet_y, magnet_z = tag.magnetometer.read()
Payload["MagnetX"] = magnet_x
Payload["MagnetY"] = magnet_y
Payload["MagnetZ"] = magnet_z
gyro_x, gyro_y, gyro_z = tag.gyroscope.read()
Payload["GyroX"] = gyro_x
Payload["GyroY"] = gyro_y
Payload["GyroZ"] = gyro_z
Payload["Light"] = tag.lightmeter.read()
Payload["Batterys"] = tag.battery.read()
Payload["Count"] = loopCount
Payload["Datetime"] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
# print("try to send!")
myAWSIoTMQTTClient.publish(topic, json.dumps(Payload), 1)
# print("end!")
loopCount += 1
time.sleep(sleeptime)
The above program sometimes fails. It may make sense to debug seriously, It's a Raspberry Pi and it's Python, so I decided it would fall. Run a script like the one below with cron once a minute. You may want to quit sendMQTT.py every time and start it with just cron, It's also heavy to start a messy Python-and There is also a part of Python that counts repetitions. I also know that it works about 10,000 times without falling, so cron is trying to monitor life and death.
By the way, with crontab -e
sendSensorData.sh
#!/bin/sh
ps | grep python
if [ "$?" -eq 0 ]
then
logger "python is exist. exit..."
exit 0
else
logger "start reset sequence..."
sudo usbrefresh.sh
sleep 3
cd /home/pi/deviceSDK
python ./sendMQTT.py -T 59 -H "AA:BB:CC:DD:EE:FF" -e awsiotnoarn.iot.ap-northeast-1.amazonaws.com >/dev/null 2>&1 &
cd
exit 0
fi
There is a part of the above script that says -T 59. This means sleeping for 59 seconds in an infinite loop in a Python script. There are subtle fluctuations such as the time to acquire data with BLE and the communication time with AWS. It may be possible to use a real-time OS and interrupt every minute, and so on. Since it is composed of Raspberry Pi, raspbian jessie, and Python, it is judged that it is impossible to do so, and it is divisible. You can actually get time-series data, but not exactly every minute. The data shows that there is a few seconds of jitter.
AWS PaaS The AWS side is built without a server. For studying.
AWS IoT I referred to this site. http://qiita.com/nsedo/items/c6f33c7cadea7023403f The rest is still the AWS SDK. MQTT just creates and throws JSON, so I didn't have to worry about using AWS IoT. I received the JSON data and put it to DynamoDB as it is.
DynamoDB
Key is ID name and time information. I've heard somewhere that this method is good when creating time series data with IoT. Others are like arranging the acquired data. When analyzing time series data, we only arrange unstructured data in the DB. As you can see, there is a place where JSON is created and thrown as it is.
Key is ID name only. Value is --Data: The last JSON received as it is --isNight: State of whether or not it recognizes that the light is on is. This time, we are only dealing with changes in Light information, so we are holding only that state. In order to make the Lambda side stateless, the state is aimed at DynamoDB.
Just in case, if you write it in JSON, it will be such data. For the time series DB, only the following Data items are lined up.
room.json
{
"Data": {
"AccX": {
"N": "0.915283203125"
},
"AccY": {
"N": "-0.129150390625"
},
"AccZ": {
"N": "0.48974609375"
},
"AmbientTemp": {
"N": "31.78125"
},
"Barometer": {
"N": "1006.03"
},
"Batterys": {
"N": "100"
},
"Count": {
"N": "261"
},
"Datetime": {
"S": "2017/09/06 13:31:35"
},
"GyroX": {
"N": "-4.0740966796875"
},
"GyroY": {
"N": "1.85394287109375"
},
"GyroZ": {
"N": "1.983642578125"
},
"Humidity": {
"N": "40.95458984375"
},
"ID": {
"S": "Raspi_1"
},
"Light": {
"N": "57.88"
},
"MagnetX": {
"N": "38.83418803418803"
},
"MagnetY": {
"N": "-19.34212454212454"
},
"MagnetZ": {
"N": "-17.842735042735043"
},
"TargetTemp": {
"N": "23.78125"
}
},
"ID": "Raspi_1",
"isNight": "0"
}
Lambda It's embarrassing because there are many places that aren't written properly. .. .. The part of Exception, the content is coming to Slack as a place where I can not come to a conclusion while I am worried, I'm not sure what to do in such a case. I don't want to be repeatedly executed when an Exception occurs in Lambda, or to post an error to Slack repeatedly, so I was wondering what to do, but I made it a future task. I don't know the specifications of Lambda either.
Building a Room table that holds the state of the room is also almost appropriate, Key: Raspberry Pi ID, here "Raspi_1" data: All data of SensorTag, Kita guy Light: Is the current state determined to be "on" or "off"? It is a design.
slackpost This is where Lambda receives the data from the dynamoDB stream. Get Light information from the ETLRoom table and compare it with the current one Determine if the light is on or off, and if there is a change, post it to slack.
slackpost.py
# coding : utf-8
import json
import datetime
import requests
import boto3
LIGHT_TAG='Light'
#===============================================================================
# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"
# Post to Slack
def PostSlack(message, icon=':ghost:', username='ETLBot', channel="#room"):
Dict_Payload = {
"text": message,
"username": username,
"icon_emoji": icon,
"channel": channel,
}
return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))
#-------------------------------------------------------------------------------
def Check_LightChanges(new, old, IsNight):
Change2Night = None
Change2Morining = None
print("new:" , new, ", old:", old, ", IsNight:", IsNight)
if (IsNight=='1') and (new > (old + 50)):
Change2Morining = True
IsNight = '0'
elif (IsNight=='0') and (new < 10):
Change2Night = True
IsNight = '1'
# Down -> UP
if Change2Morining:
message = ":smiley: Light is Turned On. Good Morning! :smiley:"
icon = ":smiley:"
# UP -> Down
elif Change2Night:
message = ":ghost: Light is Turned Down. Good Bye! :ghost:"
icon = ":ghost:"
else:
return IsNight
PostSlack(message, icon=icon)
return IsNight
#-------------------------------------------------------------------------------
table = None
def update_table(data):
ID = data['ID']['S']
# Access to ETLRoom Table
global table
if not table:
table = boto3.resource('dynamodb').Table('ETLRoom')
response = table.get_item(Key={'ID': ID})
if response:
item = response['Item']
#PostSlack(json.dumps(item))
light = round(float(item['Data'][LIGHT_TAG]['N']))
IsNight = item['IsNight']
else:
light = 0
IsNight = Check_LightChanges(round(float(data[LIGHT_TAG]['N'])), light, IsNight)
# Update Room Table
response = table.put_item(
Item={
"ID": ID,
"Data" : data,
"IsNight": IsNight
}
)
return 0
#-------------------------------------------------------------------------------
def lambda_handler(event, context):
try:
for record in event['Records']:
dynamodb = record['dynamodb']
keys = dynamodb['Keys']
data = dynamodb['NewImage']
# Keys are "ID" and "Datetime".
id = keys['ID']['S']
datetime = keys['Datetime']['S']
print("ID:", id, "/ Date:", datetime)
update_table(data)
except Exception as e:
import traceback
message = traceback.format_exc()
print(message)
PostSlack('Meets Exception!\n' + message)
raise e
return 0
#===============================================================================
getroomenv
getroomenv.py
# coding : utf-8
import json
import requests
import boto3
# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"
#===============================================================================
def MakeStr(data, key, round_n):
return str(round(float(data[key]['N']), round_n))
table = None
def GetRoomEnv(id, isAll):
global table
if not table:
table = boto3.resource('dynamodb').Table('ETLRoom')
response = table.get_item(
Key={
'ID': id
}
)
data = response['Item']['Data']
light = MakeStr(data, 'Light', 1)
temp = MakeStr(data, 'TargetTemp', 1)
humid = MakeStr(data, 'Humidity', 1)
balo = MakeStr(data, 'Barometer', 1)
time = data['Datetime']['S']
message = "" \
+ "The current temperature is" + temp + "Degree, humidity"+ humid + "Degree.\n" \
+ "Brightness" + light + "In lux, the barometric pressure is"+ balo + "It is hPa.\n" \
+ "(" + time + "Measured to:bar_chart:)"
#If there is an argument of ALL, if there is an argument, dump all data
if isAll:
message = ""
for d in data:
s = str(d)
v = data[s]
if "N" in v:
message += s + ":" + v["N"] + "\n"
else:
message += s + ":" + v["S"] + "\n"
return message
#-------------------------------------------------------------------------------
# POST to Slack
def PostSlack(message):
Dict_Payload = {
"text": message,
"username": 'NowBot',
"icon_emoji": ":clock3:",
"channel": '#room',
}
return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))
#-------------------------------------------------------------------------------
def lambda_handler(event, context):
isAll = False
try:
tri1 = 'text='
tri2 = 'trigger_word='
body = event['body']
tag = body[body.find(tri1)+len(tri1):body.find(tri2)-1]
taglist = tag.split("+")
for word in taglist:
if "ALL" in word:
isAll = True
except:
pass
try:
message = GetRoomEnv('Raspi_1', isAll)
except Exception as e:
import traceback
message = traceback.format_exc()
print(message)
PostSlack('Meets Exception!\n' + message)
raise e
PostSlack(message)
return 0
#===============================================================================
API Gateway
Slack Incoming WebHooks and Outgoing WebHooks have been added to Custom Integrations. Now you can use the UI. Super convenient.
Incoming WebHooks https://hooks.slack.com/services/XXXXXXX/YYYYYYY/ZZZZZZ Generate and set the URL for posting to Slack. All you have to do is post from Lambda. You can tell whether it is on or off. The following is an example.
<img width = "556" alt = "Screenshot 2017-09-05 21.07.36.png " src = "https://qiita-image-store.s3.amazonaws.com/0/171122/6fd49e45-4271" -642d-f5a4-045833411c15.png ">
Looking at this, we can see the following.
--Someone is sneaking in and working secretly on Saturday evening --Yesterday, the person who came early in the morning was probably a cleaner and stayed until 7: 10-7: 53, which was unexpectedly long. --The first team member came at 9:12 AM and the last one returned at 7:49 PM (healthy!)
Outgoing WebHooks Trigger Word The trigger word is "now". Send "now" and NowBot will tell you the status of the room.
<img width = "389" alt = "Screenshot 2017-09-05 20.07.48.png " src = "https://qiita-image-store.s3.amazonaws.com/0/171122/94beb1e4-359a" -45f4-a6bb-01d02354fd63.png ">
Sending "now ALL" will dump all the data in the database. For debugging.
<img width = "437" alt = "Screenshot 2017-09-05 20.11.08.png " src = "https://qiita-image-store.s3.amazonaws.com/0/171122/20954556-c401" -142f-e816-d6a2ade8f29b.png ">
I thought about processing the wording that follows "now" in natural language (such as Amazon Polly) and returning it if there is a related parameter (such as temperature), but that will be the next opportunity. .. ..
URL https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/prod/getroomenv Is set. It's made with AWS API Gateway, with Lambda behind it. The getroomenv script is running.
It took about 3 days to move all the way After that, I brushed up for about two days and watched the situation. I need a little more kung fu for DynamoDB and Lambda. It was a good study. Using Slack for the UI was a great answer. Really convenient.
Since I collected time series data, Use this to take the periodic fluctuation component If you can notify Slack if something is different than usual, I think it is good as the next development. Set aside whether it's normal for people to come to work on Saturdays. .. ..
Recommended Posts