[PYTHON] Play to notify Slack of environmental data using AWS PaaS from SensorTag via Raspberry Pi3

Overview

--I want to measure time-series environmental data using SensorTag --I'm playing one Raspberry Pi3, so I want to use it --I want to try some AWS PaaS - DynamoDB、Lambda、API Gateway --I want to monitor and operate with Slack (I don't want to write UI / UX)

→ As a result, we have created something that is working, so I will summarize it.

Overall picture

SensorTag2Slack.png The figure was created at www.draw.io. Super convenient.

Data transmission from SensorTag to Raspberry Pi3

I used TI's SensorTag CC2650stk. It has many sensors such as acceleration, temperature, humidity, illuminance, gyro, geomagnetism, and barometric pressure.

SensorTag power supply

The SensorTag is basically battery powered. However, if you move the sensor a lot, the power consumption will be high. This time, I wanted to get all the sensor information in chronological order. Attach Debugging Platform to SensorTag, connect it to RaspberryPi3 via USB, and connect it. Power was supplied by a USB cable. This allows you to measure for a long time without worrying about the battery.

SensorTag power on / off

I referred to this site. http://kinokotimes.com/2017/03/07/usb-control-method-by-raspberry-pi/ [BeagleBoneBlackBox_USB Power Control](http://www.si-linux.co.jp/techinfo/index.php?BeagleBoneBlackBox_USB%E9%9B%BB%E6%BA%90%E5%88%B6%E5%BE% A1)

I collect sensor data with the app on the Raspberry Pi side. Occasionally the app crashes and is restarting. At that time, the power supply of SensorTag is also restarted just in case. Whether it is appropriate or not, this will make the startup state uniform.

usbrefresh.sh


#! /bin/sh
hub-ctrl -h 0 -P 2 -p 0
sleep 1
hub-ctrl -h 0 -P 2 -p 1

Collection with bluepy

I referred to this site. http://taku-make.blogspot.jp/2015/02/blesensortag.html http://dev.classmethod.jp/hardware/raspberrypi/sensortag-raspberry-pi-2-script/ There were various BLE-related libraries for collecting information from sensor tags. bluepy worked the easiest, so now.

Send to AWS IoT

I used the AWS SDK Sample Code as it is. Apache 2.0 license. We have made changes such as the part to be acquired by BLE and the part to create JSON. Basically, I think the code is based on the assumption that communication will be performed using X.509 authentication information.

sendMQTT.py


'''
/*
 * Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
 '''

###-----------------------------------------------------------------------------

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import sys
import logging
import time
import argparse
from btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers
import struct
import math
from sensortag import SensorTag, KeypressDelegate
import json
from datetime import datetime

###-----------------------------------------------------------------------------

# Custom MQTT message callback
def customCallback(client, userdata, message):
        print("--------------")
	print("Received  : " + message.payload)
	print("from topic: " + message.topic)
	print("--------------\n\n")

###-----------------------------------------------------------------------------

# Read in command-line parameters
parser = argparse.ArgumentParser()

### AWS!
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", dest="rootCAPath", default="root-CA.crt", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", default="certificate.pem.crt",help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", default="private.pem.key",help="Private key file path")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
                    help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="Raspi_1", help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="etl/room", help="Targeted topic")

### SensorTag!
parser.add_argument('-n', action='store', dest='count', default=0,
        type=int, help="Number of times to loop data")
parser.add_argument('-T',action='store', dest="sleeptime", type=float, default=5.0, help='time between polling')
parser.add_argument('-H', action='store', dest="taghost", help='MAC of BT device')
parser.add_argument('--all', action='store_true', default=True)

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic
sleeptime = args.sleeptime
deviceID = args.clientId

###=============================================================================

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
	parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
	exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
	parser.error("Missing credentials for authentication.")
	exit(2)

###=============================================================================

# Enabling selected sensors
print('Connecting to ' + args.taghost)
tag = SensorTag(args.taghost)
if args.all:
    tag.IRtemperature.enable()
    tag.humidity.enable()
    tag.barometer.enable()
    tag.accelerometer.enable()
    tag.magnetometer.enable()
    tag.gyroscope.enable()
    tag.battery.enable()
    tag.keypress.enable()
    tag.setDelegate(KeypressDelegate())
    tag.lightmeter.enable()
    time.sleep(1.0)

###=============================================================================

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

###=============================================================================

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
	myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
	myAWSIoTMQTTClient.configureEndpoint(host, 443)
	myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
	myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
	myAWSIoTMQTTClient.configureEndpoint(host, 8883)
	myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(0.5)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
#myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)

###=============================================================================

# Publish to the same topic in a loop forever
loopCount = 0
Payload = {}
while True:
    Payload['ID'] = str(deviceID)

    ambient_temp, target_temparature = tag.IRtemperature.read()
    Payload["AmbientTemp"] = ambient_temp
    Payload["TargetTemp"] = target_temparature

    ambient_temp, rel_humidity = tag.humidity.read()
    Payload["Humidity"] = rel_humidity

    ambient_temp, pressure_millibars = tag.barometer.read()
    Payload["Barometer"] = pressure_millibars

    Acc_x, Acc_y, Acc_z = tag.accelerometer.read()
    Payload["AccX"] = Acc_x
    Payload["AccY"] = Acc_y
    Payload["AccZ"] = Acc_z

    magnet_x, magnet_y, magnet_z = tag.magnetometer.read()
    Payload["MagnetX"] = magnet_x
    Payload["MagnetY"] = magnet_y
    Payload["MagnetZ"] = magnet_z

    gyro_x, gyro_y, gyro_z = tag.gyroscope.read()
    Payload["GyroX"] = gyro_x
    Payload["GyroY"] = gyro_y
    Payload["GyroZ"] = gyro_z

    Payload["Light"] = tag.lightmeter.read()
    Payload["Batterys"] = tag.battery.read()

    Payload["Count"] = loopCount
    Payload["Datetime"] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")

#    print("try to send!")
    myAWSIoTMQTTClient.publish(topic, json.dumps(Payload), 1)
#    print("end!")
    loopCount += 1
    time.sleep(sleeptime)

Life and death monitoring of Python programs

The above program sometimes fails. It may make sense to debug seriously, It's a Raspberry Pi and it's Python, so I decided it would fall. Run a script like the one below with cron once a minute. You may want to quit sendMQTT.py every time and start it with just cron, It's also heavy to start a messy Python-and There is also a part of Python that counts repetitions. I also know that it works about 10,000 times without falling, so cron is trying to monitor life and death.

By the way, with crontab -e

sendSensorData.sh


#!/bin/sh

ps | grep python
if [ "$?" -eq 0 ]
then
  logger "python is exist.  exit..."
  exit 0
else

  logger "start reset sequence..."
  sudo usbrefresh.sh
  sleep 3

  cd /home/pi/deviceSDK
  python ./sendMQTT.py -T 59 -H "AA:BB:CC:DD:EE:FF" -e awsiotnoarn.iot.ap-northeast-1.amazonaws.com  >/dev/null 2>&1 & 

  cd
  exit 0
fi

Accuracy to operate every cycle

There is a part of the above script that says -T 59. This means sleeping for 59 seconds in an infinite loop in a Python script. There are subtle fluctuations such as the time to acquire data with BLE and the communication time with AWS. It may be possible to use a real-time OS and interrupt every minute, and so on. Since it is composed of Raspberry Pi, raspbian jessie, and Python, it is judged that it is impossible to do so, and it is divisible. You can actually get time-series data, but not exactly every minute. The data shows that there is a few seconds of jitter.

AWS PaaS The AWS side is built without a server. For studying.

AWS IoT I referred to this site. http://qiita.com/nsedo/items/c6f33c7cadea7023403f The rest is still the AWS SDK. MQTT just creates and throws JSON, so I didn't have to worry about using AWS IoT. I received the JSON data and put it to DynamoDB as it is.

DynamoDB

DB for saving time series data

Key is ID name and time information. I've heard somewhere that this method is good when creating time series data with IoT. Others are like arranging the acquired data. When analyzing time series data, we only arrange unstructured data in the DB. As you can see, there is a place where JSON is created and thrown as it is.

DB for saving status

Key is ID name only. Value is --Data: The last JSON received as it is --isNight: State of whether or not it recognizes that the light is on is. This time, we are only dealing with changes in Light information, so we are holding only that state. In order to make the Lambda side stateless, the state is aimed at DynamoDB.

Just in case, if you write it in JSON, it will be such data. For the time series DB, only the following Data items are lined up.

room.json


{
  "Data": {
    "AccX": {
      "N": "0.915283203125"
    },
    "AccY": {
      "N": "-0.129150390625"
    },
    "AccZ": {
      "N": "0.48974609375"
    },
    "AmbientTemp": {
      "N": "31.78125"
    },
    "Barometer": {
      "N": "1006.03"
    },
    "Batterys": {
      "N": "100"
    },
    "Count": {
      "N": "261"
    },
    "Datetime": {
      "S": "2017/09/06 13:31:35"
    },
    "GyroX": {
      "N": "-4.0740966796875"
    },
    "GyroY": {
      "N": "1.85394287109375"
    },
    "GyroZ": {
      "N": "1.983642578125"
    },
    "Humidity": {
      "N": "40.95458984375"
    },
    "ID": {
      "S": "Raspi_1"
    },
    "Light": {
      "N": "57.88"
    },
    "MagnetX": {
      "N": "38.83418803418803"
    },
    "MagnetY": {
      "N": "-19.34212454212454"
    },
    "MagnetZ": {
      "N": "-17.842735042735043"
    },
    "TargetTemp": {
      "N": "23.78125"
    }
  },
  "ID": "Raspi_1",
  "isNight": "0"
}

Lambda It's embarrassing because there are many places that aren't written properly. .. .. The part of Exception, the content is coming to Slack as a place where I can not come to a conclusion while I am worried, I'm not sure what to do in such a case. I don't want to be repeatedly executed when an Exception occurs in Lambda, or to post an error to Slack repeatedly, so I was wondering what to do, but I made it a future task. I don't know the specifications of Lambda either.

Building a Room table that holds the state of the room is also almost appropriate, Key: Raspberry Pi ID, here "Raspi_1" data: All data of SensorTag, Kita guy Light: Is the current state determined to be "on" or "off"? It is a design.

slackpost This is where Lambda receives the data from the dynamoDB stream. Get Light information from the ETLRoom table and compare it with the current one Determine if the light is on or off, and if there is a change, post it to slack.

slackpost.py


# coding : utf-8
import json
import datetime
import requests
import boto3

LIGHT_TAG='Light'

#===============================================================================
# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"

# Post to Slack
def PostSlack(message, icon=':ghost:', username='ETLBot', channel="#room"):
    
    Dict_Payload = {
    "text": message,
    "username": username,
    "icon_emoji": icon,
    "channel": channel,
    }
    return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))

#-------------------------------------------------------------------------------
    
def Check_LightChanges(new, old, IsNight):

    Change2Night = None
    Change2Morining = None
    print("new:" , new, ", old:", old, ", IsNight:", IsNight)
    
    if (IsNight=='1') and (new > (old + 50)):
        Change2Morining = True
        IsNight = '0'
    elif (IsNight=='0') and (new < 10):
        Change2Night = True
        IsNight = '1'
    
    # Down -> UP
    if Change2Morining:
        message = ":smiley: Light is Turned On. Good Morning! :smiley:"
        icon = ":smiley:"
    # UP -> Down
    elif Change2Night:
        message = ":ghost: Light is Turned Down. Good Bye! :ghost:"
        icon = ":ghost:"
    else:
        return IsNight

    PostSlack(message, icon=icon)
    return IsNight
#-------------------------------------------------------------------------------

table = None
def update_table(data):
    ID = data['ID']['S']
    
    # Access to ETLRoom Table
    global table
    if not table:
        table = boto3.resource('dynamodb').Table('ETLRoom')
    response = table.get_item(Key={'ID': ID})
    if response:
        item = response['Item']
        #PostSlack(json.dumps(item))
        light = round(float(item['Data'][LIGHT_TAG]['N']))
        IsNight = item['IsNight']
    else:
        light = 0

    IsNight = Check_LightChanges(round(float(data[LIGHT_TAG]['N'])), light, IsNight)
    
    # Update Room Table
    response = table.put_item(
    Item={
          "ID": ID,
          "Data" : data,
          "IsNight": IsNight
        }
    )

    return 0

#-------------------------------------------------------------------------------

def lambda_handler(event, context):
    
    try:
        for record in event['Records']:
            dynamodb = record['dynamodb']
            keys = dynamodb['Keys']
            data = dynamodb['NewImage']
        
        # Keys are "ID" and "Datetime".
        id = keys['ID']['S']
        datetime = keys['Datetime']['S']
        print("ID:", id, "/ Date:", datetime)
        
        update_table(data)
    except Exception as e:
        import traceback
        message = traceback.format_exc()
        print(message)
        PostSlack('Meets Exception!\n' + message)
        raise e
        
    return 0

#===============================================================================

getroomenv

getroomenv.py


# coding : utf-8
import json
import requests
import boto3

# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"

#===============================================================================

def MakeStr(data, key, round_n):
    return str(round(float(data[key]['N']), round_n))

table = None
def GetRoomEnv(id, isAll):
    
    global table
    if not table:
        table = boto3.resource('dynamodb').Table('ETLRoom')

    response = table.get_item(
        Key={
            'ID': id
        }
    )
    
    data = response['Item']['Data']
    light = MakeStr(data, 'Light', 1)
    temp = MakeStr(data, 'TargetTemp', 1)
    humid = MakeStr(data, 'Humidity', 1)
    balo = MakeStr(data, 'Barometer', 1)
    time = data['Datetime']['S']
    
    message = "" \
        + "The current temperature is" + temp + "Degree, humidity"+ humid + "Degree.\n" \
        + "Brightness" + light + "In lux, the barometric pressure is"+ balo + "It is hPa.\n" \
        + "(" + time + "Measured to:bar_chart:)"
        
    #If there is an argument of ALL, if there is an argument, dump all data
    if isAll:
        message = ""
        for d in data:
            s = str(d)
            v = data[s]
            if "N" in v:
                message += s + ":" + v["N"] + "\n"
            else:
                message += s + ":" + v["S"] + "\n"

    return message
    
#-------------------------------------------------------------------------------

# POST to Slack
def PostSlack(message):
    
    Dict_Payload = {
    "text": message,
    "username": 'NowBot',
    "icon_emoji": ":clock3:",
    "channel": '#room',
    }
    return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))

#-------------------------------------------------------------------------------
def lambda_handler(event, context):
    
    isAll = False
    try:
        tri1 = 'text='
        tri2 = 'trigger_word='
        body = event['body']
        tag = body[body.find(tri1)+len(tri1):body.find(tri2)-1]
        taglist = tag.split("+")
        for word in taglist:
            if "ALL" in word:
                isAll = True
    except:
        pass
    
    try:
        message = GetRoomEnv('Raspi_1', isAll)
    except Exception as e:
        import traceback
        message = traceback.format_exc()
        print(message)
        PostSlack('Meets Exception!\n' + message)
        raise e
        
    PostSlack(message)
    return 0

#===============================================================================

API Gateway

Slack Incoming WebHooks and Outgoing WebHooks have been added to Custom Integrations. Now you can use the UI. Super convenient.

Incoming WebHooks https://hooks.slack.com/services/XXXXXXX/YYYYYYY/ZZZZZZ Generate and set the URL for posting to Slack. All you have to do is post from Lambda. You can tell whether it is on or off. The following is an example.

<img width = "556" alt = "Screenshot 2017-09-05 21.07.36.png " src = "https://qiita-image-store.s3.amazonaws.com/0/171122/6fd49e45-4271" -642d-f5a4-045833411c15.png ">

Looking at this, we can see the following.

--Someone is sneaking in and working secretly on Saturday evening --Yesterday, the person who came early in the morning was probably a cleaner and stayed until 7: 10-7: 53, which was unexpectedly long. --The first team member came at 9:12 AM and the last one returned at 7:49 PM (healthy!)

Outgoing WebHooks Trigger Word The trigger word is "now". Send "now" and NowBot will tell you the status of the room.

<img width = "389" alt = "Screenshot 2017-09-05 20.07.48.png " src = "https://qiita-image-store.s3.amazonaws.com/0/171122/94beb1e4-359a" -45f4-a6bb-01d02354fd63.png ">

Sending "now ALL" will dump all the data in the database. For debugging.

<img width = "437" alt = "Screenshot 2017-09-05 20.11.08.png " src = "https://qiita-image-store.s3.amazonaws.com/0/171122/20954556-c401" -142f-e816-d6a2ade8f29b.png ">

I thought about processing the wording that follows "now" in natural language (such as Amazon Polly) and returning it if there is a related parameter (such as temperature), but that will be the next opportunity. .. ..

URL https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/prod/getroomenv Is set. It's made with AWS API Gateway, with Lambda behind it. The getroomenv script is running.

Impressions

It took about 3 days to move all the way After that, I brushed up for about two days and watched the situation. I need a little more kung fu for DynamoDB and Lambda. It was a good study. Using Slack for the UI was a great answer. Really convenient.

Since I collected time series data, Use this to take the periodic fluctuation component If you can notify Slack if something is different than usual, I think it is good as the next development. Set aside whether it's normal for people to come to work on Saturdays. .. ..

Recommended Posts

Play to notify Slack of environmental data using AWS PaaS from SensorTag via Raspberry Pi3
Send data from Raspberry Pi using AWS IOT
How to get temperature from switchBot thermo-hygrometer using raspberry Pi
I sent the data of Raspberry Pi to GCP (free)
Notify LINE of body temperature from BLE thermometer with Raspberry Pi # 1
Notify LINE of body temperature from BLE thermometer with Raspberry Pi # 2
Output from Raspberry Pi to Line
Creating a LINE BOT to notify you of additional AtCoder contests using AWS
I tried to notify slack of Redmine update
Automatic launch of Raspberry Pi programs using Systemd
CSV output of pulse data with Raspberry Pi (CSV output)
Connect your Raspberry Pi to your smartphone using Blynk
[AWS] Migrate data from DynamoDB to Aurora MySQL
Memo of migrating Django's DB from SQLite 3 to MySQL on Docker on Raspberry Pi 4B
Try casting videos and websites from Raspberry Pi to Chromecast and Nest Hub using CATT