[PYTHON] Cospa's strongest IoT home appliances! Operate TPLink products from Raspberry Pi

What is TPLink?

A network equipment manufacturer in Shenzhen, China, whose main product is routers. In recent years, we have been focusing on IoT home appliances such as smart light bulbs and smart plugs, and have established a unique position on Amazon due to the good cost performance.

plug.jpg [Smart Plug HS105](https://www.amazon.co.jp/Alexa%E8%AA%8D%E5%AE%9A%E5%8F%96%E5%BE%97%E8%A3%BD % E5% 93% 81% E3% 80% 91-% E7% 9B% B4% E5% B7% AE% E3% 81% 97% E3% 82% B3% E3% 83% B3% E3% 82% BB% E3% 83% B3% E3% 83% 88-Google% E3% 83% 9B% E3% 83% BC% E3% 83% A0% E5% AF% BE% E5% BF% 9C-% E9% 9F% B3 % E5% A3% B0% E3% 82% B3% E3% 83% B3% E3% 83% 88% E3% 83% AD% E3% 83% BC% E3% 83% AB-HS105 / dp / B078HSBNMT? Ref_ = ast_sto_dp)

bulb.jpg [Smart Light Bulb KL110](https://www.amazon.co.jp/%E3%80%90Amazon-Alexa%E8%AA%8D%E5%AE%9A-%E3%80%91TP-Link-KL110] -Google / dp / B07GC4JR83? Ref_ = ast_sto_dp & th = 1)

This time, using the API, ・ ON-OFF operation of equipment ・ Acquisition of information such as ON-OFF and light bulb brightness I tried running it in Python and Node.js

Since the above can cover many of the applications that can be thought of as IoT home appliances. ** The result of feeling the possibility of application **!

Things necessary

** ・ PC ** ** ・ Raspberry Pi ** ** ・ TPLink smart plug or light bulb ** This time I tried the following 3 products HS105: Smart plug KL110: White light bulb KL130: Color light bulb

① Confirmation of data acquisition

First, test on the terminal whether the data can be obtained from TPLink.

Examine the IP

Install tplink-smarthome-api (reference)

sudo npm install -g tplink-smarthome-api

Get a list of connected TPLink devices with the following command

tplink-smarthome-api search
HS105(JP) plug IOT.SMARTPLUGSWITCH 192.168.0.101 9999 B0BE76 ‥ Smart plug
KL110(JP) bulb IOT.SMARTBULB 192.168.0.102 9999 98DAC4 ‥ White light bulb
KL130(JP) bulb IOT.SMARTBULB 192.168.0.103 9999 0C8063 ‥ Color light bulb

You can see that all three devices have been detected

Confirmation of acquisition of device operation information

You can get device settings and OnOff with the following command.

tplink-smarthome-api getSysInfo [IP address of the device]:9999

** ・ Example of KL130 (color bulb) **

  :
  ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ Here is the device settings
  light_state: {
    on_off: 1,
    mode: 'normal',
    hue: 0,
    saturation: 0,
    color_temp: 2700,
    brightness: 100
  },
↑ This is the device setting
  is_dimmable: 1,
  is_color: 1,
  :

on_off: 0 means power off, 1 means power on hue: color? (0 in white mode) color_temp: Color temperature (0 when not in white mode) brightness: Brightness (in%) Seems to be

** ・ Example of KL110 (white light bulb) **

  :
  ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ Here is the device settings
  light_state: {
    on_off: 1,
    mode: 'normal',
    hue: 0,
    saturation: 0,
    color_temp: 2700,
    brightness: 100
  },
↑ This is the device setting
  is_dimmable: 1,
  is_color: 0,
  :

on_off: 0 means power off, 1 means power on hue: Hue (0 in white mode) saturation: saturation color_temp: Color temperature (0 when not in white mode) brightness: Brightness (in%) It seems that. It's almost the same as KL130, but it's not a color, so it seems to be is_color: 0.

** ・ Example of KL105 (smart plug) **

  alias: '',
↓ Here is the device settings
  relay_state: 1,
  on_time: 288,
  active_mode: 'none',
  feature: 'TIM',
  updating: 0,
  icon_hash: '',
  rssi: -52,
  led_off: 0,
  longitude_i: 1356352,
  latitude_i: 348422,
↑ This is the device setting
  hwId: '047D‥',

relay_state: 0 for power off, 1 for power on on_time: Continuous power ON time rssi: WiFi signal strength It seems that. The longitude (logitude) and latitude (latitude) are also displayed, but the mystery deepens when it is 5 km away from the actual location.

Above, you can confirm that you can get the information you want with the command! In the following chapters, we will describe how to get and operate from the program (Node.js & Python).

② Get state with Node.js

** * If you say "I don't need to explain Node.js because I use Python!", Please skip this chapter and move to ③ **

Refer to here and refer to Node.js

Pass the path to npm (for Window)

On Windows, the path does not pass to the global installation destination of npm, and the module can not be loaded with Node.js, so please pass it by referring to the following https://qiita.com/shiftsphere/items/5610f692899796b03f99

Pass the path to npm (for Raspberry Pi)

Find out where to install the npm module globally with the command below (For some reason, it seems to be different from the folder found by the command "npm bin -g" in Windows)

npm ls -g 

Edit .profile with the following command.

nano /home/[User name]/.profile

Add the following line to the end of the .profile and reboot

export NODE_PATH=[The path examined above]/node_modules

If the path specified by the following command is displayed, it is successful.

printenv NODE_PATH

Creating a node.js script

Create the following script

tplink_test.js


const { Client } = require('tplink-smarthome-api');
const client = new Client();
client.getDevice({ host: '192.168.0.102' }).then(device => {
  device.getSysInfo().then(console.log);
});

If you execute the script with the following command, you can get various information in the same way as ①.

node tplink_test.js

③ Get state with Python

I didn't log well in Node.js due to my lack of JavaScript skills, so I regained my mind and made a script to operate and log in Python.

Python struggled because I couldn't find a document as polite as Node.js, but here and here I decrypted the code of tplink-lb130-api / blob / master / lb130.py) and created a script.

Creating a TPLink operation class

Above I created the following 4 classes by referring to the code. ** TPLink_Common (): Class of common functions for plugs and light bulbs ** ** TPLink_Plug (): Plug-only function class ** (inherits TPLink_Common ()) ** TPLink_Bulb (): Light bulb-only function class ** (inheriting TPLink_Common ()) ** GetTPLinkData (): Class to get data using the above class **

tplink.py


import socket
from struct import pack
import json

#TPLink data acquisition class
class GetTPLinkData():
    #Method for getting plug data
    def get_plug_data(self, ip):
        #Creating a class for plug operation
        plg = TPLink_Plug(ip)
        #Get data and convert to dict
        rjson = plg.info()
        rdict = json.loads(rjson)
        return rdict

    #Light bulb data acquisition method
    def get_bulb_data(self, ip):
        #Creating a class for operating a light bulb
        blb = TPLink_Bulb(ip)
        #Get data and convert to dict
        rjson = blb.info()
        rdict = json.loads(rjson)
        return rdict

#TPLink bulb & plug common class
class TPLink_Common():
    def __init__(self, ip, port=9999):
        """Default constructor
        """
        self.__ip = ip
        self.__port = port
    
    def info(self):
        cmd = '{"system":{"get_sysinfo":{}}}'
        receive = self.send_command(cmd)
        return receive
    
    def send_command(self, cmd, timeout=10):
        try:
            sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock_tcp.settimeout(timeout)
            sock_tcp.connect((self.__ip, self.__port))
            sock_tcp.settimeout(None)
            sock_tcp.send(self.encrypt(cmd))
            data = sock_tcp.recv(2048)
            sock_tcp.close()

            decrypted = self.decrypt(data[4:])
            print("Sent:     ", cmd)
            print("Received: ", decrypted)
            return decrypted

        except socket.error:
            quit("Could not connect to host " + self.__ip + ":" + str(self.__port))
            return None

    def encrypt(self, string):
        key = 171
        result = pack('>I', len(string))
        for i in string:
            a = key ^ ord(i)
            key = a
            result += bytes([a])
        return result

    def decrypt(self, string):
        key = 171
        result = ""
        for i in string:
            a = key ^ i
            key = i
            result += chr(a)
        return result

#TPLink plug operation class
class TPLink_Plug(TPLink_Common):
    def on(self):
        cmd = '{"system":{"set_relay_state":{"state":1}}}'
        receive = self.send_command(cmd)

    def off(self):
        cmd = '{"system":{"set_relay_state":{"state":0}}}'
        receive = self.send_command(cmd)

    def ledon(self):
        cmd = '{"system":{"set_led_off":{"off":0}}}'
        receive = self.send_command(cmd)

    def ledoff(self):
        cmd = '{"system":{"set_led_off":{"off":1}}}'
        receive = self.send_command(cmd)
    
    def set_countdown_on(self, delay):
        cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":1,"name":"turn on"}}}'
        receive = self.send_command(cmd)

    def set_countdown_off(self, delay):
        cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":0,"name":"turn off"}}}'
        receive = self.send_command(cmd)
    
    def delete_countdown_table(self):
        cmd = '{"count_down":{"delete_all_rules":null}}'
        receive = self.send_command(cmd)

    def energy(self):
        cmd = '{"emeter":{"get_realtime":{}}}'
        receive = self.send_command(cmd)
        return receive

#TPLink bulb operation class
class TPLink_Bulb(TPLink_Common):
    def on(self):
        cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":1}}}'
        receive = self.send_command(cmd)

    def off(self):
        cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":0}}}'
        receive = self.send_command(cmd)

    def transition_light_state(self, hue: int = None, saturation: int = None, brightness: int = None,
                               color_temp: int = None, on_off: bool = None, transition_period: int = None,
                               mode: str = None, ignore_default: bool = None):
        # copy all given argument name-value pairs as a dict
        d = {k: v for k, v in locals().items() if k is not 'self' and v is not None}
        r = {
            'smartlife.iot.smartbulb.lightingservice': {
                'transition_light_state': d
            }
        }
        cmd = json.dumps(r)
        receive = self.send_command(cmd)
        print(receive)

    def brightness(self, brightness):
        self.transition_light_state(brightness=brightness)

    def purple(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=277, saturation=86, color_temp=0, brightness=brightness, transition_period=transition_period)

    def blue(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=240, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def cyan(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=180, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def green(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=120, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def yellow(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=60, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def orange(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=39, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def red(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=0, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def lamp_color(self, brightness = None):
        self.transition_light_state(color_temp=2700, brightness=brightness)

How to execute TPLink operation class

The above class can be executed on Python code as below

** ・ When you want to turn on the power of the light bulb **

TPLink_Bulb(IP address of the light bulb).on()

** ・ When you want to turn off the power of the plug **

TPLink_Plug(IP address of the plug).off()

** ・ When you want to turn on the plug after 10 seconds **

TPLink_Plug(IP address of the plug).set_countdown_on(10)

** ・ When you want to increase the brightness of the light bulb to 10% **

TPLink_Bulb(IP address of the light bulb).brightness(10)

** ・ When you want to make the light bulb red (color light bulb only) **

TPLink_Bulb(IP address of the light bulb).red()

** ・ Get information such as on-off of light bulbs **

info = GetTPLinkData().get_plug_data(IP address of the plug)

④ Creation of Python script for logging

Using the last method in the previous chapter, I created a script that logs information about light bulbs and plugs. The structure of the script is [here](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E3%83%A1%E3%82%A4%E3%83%B3%E3%82%B9%E3%82 It is the same as% AF% E3% 83% AA% E3% 83% 97% E3% 83% 88% E4% BD% 9C% E6% 88% 90), so please read the link.

setting file

[Here](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E8%A8%AD%E5%AE%9A%E3%83%95%E3%82%A1%E3%82%A4%E3 Like the article of% 83% AB), we created the following two types of configuration files to make it easier to manage. -DeviceList.csv: Describe the necessary information for each sensor

DeviceList.csv
ApplianceName,ApplianceType,IP,Retry
TPLink_KL130_ColorBulb_1,TPLink_ColorBulb,192.168.0.103,2
TPLink_KL110_WhiteBulb_1,TPLink_WhiteBulb,192.168.0.102,2
TPLink_HS105_Plug_1,TPLink_Plug,192.168.0.101,2

The meaning of the columns is as follows ApplianceName: Manages device names and identifies multiple devices of the same type ApplianceType: Device type. TPLink_ColorBulb: Color bulb (KL130, etc.) TPLink_WhiteBulb: White light bulb (KL110, etc.) TPLink_Plug: Smart plug (HS105, etc.) IP: IP address of the device Retry: Maximum number of re-executions Details (Re-execution count when acquisition fails, click here](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E4%B8%8D%E5%85%B7%E5 % 90% 881peripheral% E3% 81% AE% E5% 88% 9D% E6% 9C% 9F% E5% 8C% 96% E6% 99% 82% E3% 81% AB% E3% 82% A8% E3% 83 % A9% E3% 83% BC% E3% 81% 8C% E5% 87% BA% E3% 82% 8B))

-Config.ini: Specify CSV and log output directory

config.ini [Path] CSVOutput = /share/Data/Appliance LogOutput = /share/Log/Appliance If you output both in the shared folder created by samba, you can access it from outside the Raspberry Pi, which is convenient.

Actual script

appliance_data_logger.py


from tplink import GetTPLinkData
import logging
from datetime import datetime, timedelta
import os
import csv
import configparser
import pandas as pd

#Global variables
global masterdate

######TPLink data acquisition######
def getdata_tplink(appliance):
    #Maximum appliance when no data value is available.Retry Repeat scan
    for i in range(appliance.Retry):
        try:
            #When plugging
            if appliance.ApplianceType == 'TPLink_Plug':
                applianceValue = GetTPLinkData().get_plug_data(appliance.IP)
            #When it's a light bulb
            elif appliance.ApplianceType == 'TPLink_ColorBulb' or appliance.ApplianceType == 'TPLink_WhiteBulb':
                applianceValue = GetTPLinkData().get_bulb_data(appliance.IP)
            else:
                applianceValue = None
        #Log output if an error occurs
        except:
            logging.warning(f'retry to get data [loop{str(i)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
            applianceValue = None
            continue
        else:
            break
    
    #If the value can be obtained, store the POST data in dict
    if applianceValue is not None:
        #When plugging
        if appliance.ApplianceType == 'TPLink_Plug':
            data = {        
                'ApplianceName': appliance.ApplianceName,        
                'Date_Master': str(masterdate),
                'Date': str(datetime.today()),
                'IsOn': str(applianceValue['system']['get_sysinfo']['relay_state']),
                'OnTime': str(applianceValue['system']['get_sysinfo']['on_time'])
            }
        #When it's a light bulb
        else:
            data = {        
                'ApplianceName': appliance.ApplianceName,        
                'Date_Master': str(masterdate),
                'Date': str(datetime.today()),
                'IsOn': str(applianceValue['system']['get_sysinfo']['light_state']['on_off']),
                'Color': str(applianceValue['system']['get_sysinfo']['light_state']['hue']),
                'ColorTemp': str(applianceValue['system']['get_sysinfo']['light_state']['color_temp']),
                'Brightness': str(applianceValue['system']['get_sysinfo']['light_state']['brightness'])
            }
        return data
        
    #If it could not be obtained, log output
    else:
        logging.error(f'cannot get data [loop{str(appliance.Retry)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
        return None

######CSV output of data######
def output_csv(data, csvpath):
    appliancename = data['ApplianceName']
    monthstr = masterdate.strftime('%Y%m')
    #Output destination folder name
    outdir = f'{csvpath}/{appliancename}/{masterdate.year}'
    #When the output destination folder does not exist, create a new one
    os.makedirs(outdir, exist_ok=True)
    #Output file path
    outpath = f'{outdir}/{appliancename}_{monthstr}.csv'
    
    #Create a new output file when it does not exist
    if not os.path.exists(outpath):        
        with open(outpath, 'w', newline="") as f:
            writer = csv.DictWriter(f, data.keys())
            writer.writeheader()
            writer.writerow(data)
    #Add one line when the output file exists
    else:
        with open(outpath, 'a', newline="") as f:
            writer = csv.DictWriter(f, data.keys())
            writer.writerow(data)

######Main######
if __name__ == '__main__':    
    #Get start time
    startdate = datetime.today()
    #Round the start time in minutes
    masterdate = startdate.replace(second=0, microsecond=0)   
    if startdate.second >= 30:
        masterdate += timedelta(minutes=1)

    #Read configuration file and device list
    cfg = configparser.ConfigParser()
    cfg.read('./config.ini', encoding='utf-8')
    df_appliancelist = pd.read_csv('./ApplianceList.csv')
    #Total number of sensors and successful data acquisition
    appliance_num = len(df_appliancelist)
    success_num = 0

    #Log initialization
    logname = f"/appliancelog_{str(masterdate.strftime('%y%m%d'))}.log"
    logging.basicConfig(filename=cfg['Path']['LogOutput'] + logname, level=logging.INFO)

    #Dict for holding all acquired data
    all_values_dict = None

    ######Data acquisition for each device######
    for appliance in df_appliancelist.itertuples():
        #Confirm that Appliance Type is TPLinke
        if appliance.ApplianceType in ['TPLink_Plug','TPLink_ColorBulb','TPLink_WhiteBulb']:
            data = getdata_tplink(appliance)
        #Other than those above
        else:
            data = None        

        #When data exists, add it to Dict for holding all data and output CSV
        if data is not None:
            #all_values_Create a new dictionary when dict is None
            if all_values_dict is None:
                all_values_dict = {data['ApplianceName']: data}
            #all_values_Add to existing dictionary when dict is not None
            else:
                all_values_dict[data['ApplianceName']] = data

            #CSV output
            output_csv(data, cfg['Path']['CSVOutput'])
            #Success number plus
            success_num+=1

    #Log output of processing end
    logging.info(f'[masterdate{str(masterdate)} startdate{str(startdate)} enddate{str(datetime.today())} success{str(success_num)}/{str(appliance_num)}]')

If you execute the above, the acquired data will be CSV output with the device name and date and time name to the folder specified in the setting file "CSVOutput". tplinkcsv.png

This completes the information acquisition.

in conclusion

It runs 24 hours a day on RaspberrypPi, and Python has more freedom than IFTTT, so you can embody various ideas. ・ Combine with a motion sensor to turn on electricity when a person enters ・ Turn off the lights if there are no people for 30 minutes or more. ・ Automatically switches the brightness of the light bulb depending on the person And so on.

I have some things I want to make, so I will write an article again when the production is completed.

Recommended Posts

Cospa's strongest IoT home appliances! Operate TPLink products from Raspberry Pi
How to easily operate IOT home appliances from Siri by API hacking
Send data from Raspberry Pi using AWS IOT
Let's access your Raspberry Pi from outside your home with VPN (WireGuard)
Link SORACOM, home appliances and LINE Bot [Python / Flask / Raspberry Pi]
Output from Raspberry Pi to Line