[PYTHON] Les appareils électroménagers IoT les plus puissants de Cospa! Exploitez les produits TPLink de Raspberry Pi

Qu'est-ce que TPLink?

Un fabricant d'équipement de réseau à Shenzhen, en Chine, dont l'activité principale est les routeurs. Ces dernières années, nous nous sommes concentrés sur les appareils électroménagers IoT tels que les ampoules intelligentes et les prises intelligentes, et avons établi une position unique sur Amazon en raison de la bonne performance des coûts.

plug.jpg [Smart Plug HS105](https://www.amazon.co.jp/Alexa%E8%AA%8D%E5%AE%9A%E5%8F%96%E5%BE%97%E8%A3%BD % E5% 93% 81% E3% 80% 91-% E7% 9B% B4% E5% B7% AE% E3% 81% 97% E3% 82% B3% E3% 83% B3% E3% 82% BB% E3% 83% B3% E3% 83% 88-Google% E3% 83% 9B% E3% 83% BC% E3% 83% A0% E5% AF% BE% E5% BF% 9C-% E9% 9F% B3 % E5% A3% B0% E3% 82% B3% E3% 83% B3% E3% 83% 88% E3% 83% AD% E3% 83% BC% E3% 83% AB-HS105 / dp / B078HSBNMT? Ref_ = ast_sto_dp)

bulb.jpg [Ampoule intelligente KL110](https://www.amazon.co.jp/%E3%80%90Amazon-Alexa%E8%AA%8D%E5%AE%9A-%E3%80%91TP-Link-KL110] -Google / dp / B07GC4JR83? Ref_ = ast_sto_dp & th = 1)

Cette fois, en utilisant l'API, ・ Fonctionnement ON-OFF de l'appareil ・ Acquisition d'informations telles que ON-OFF et luminosité des ampoules J'ai essayé de l'exécuter en Python et Node.js

Étant donné que ce qui précède peut couvrir de nombreuses applications pouvant être considérées comme des appareils électroménagers IoT. ** Le résultat de sentir la possibilité d'application **!

Les choses nécessaires

** ・ PC ** ** · Tarte aux framboises ** ** ・ Prise intelligente ou ampoule TPLink ** Cette fois, j'ai essayé les 3 produits suivants HS105: prise intelligente KL110: ampoule blanche KL130: ampoule de couleur

① Confirmation de l'acquisition des données

Tout d'abord, testez sur le terminal si les données peuvent être obtenues à partir de TPLink.

Examiner l'IP

Installez tplink-smarthome-api (référence)

sudo npm install -g tplink-smarthome-api

Obtenez une liste des périphériques TPLink connectés avec la commande suivante

tplink-smarthome-api search
HS105(JP) plug IOT.SMARTPLUGSWITCH 192.168.0.101 9999 B0BE76 ‥ Prise intelligente
KL110(JP) bulb IOT.SMARTBULB 192.168.0.102 9999 98DAC4 ‥ Ampoule blanche
KL130(JP) bulb IOT.SMARTBULB 192.168.0.103 9999 0C8063 ‥ Ampoule couleur

Vous pouvez voir que les trois appareils ont été détectés

Confirmation de l'acquisition des informations de fonctionnement de l'appareil

Vous pouvez obtenir les paramètres de l'appareil et OnOff avec la commande suivante

tplink-smarthome-api getSysInfo [Adresse IP de l'appareil]:9999

** ・ Exemple de KL130 (ampoule de couleur) **

  :
  ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ Voici les paramètres de l'appareil
  light_state: {
    on_off: 1,
    mode: 'normal',
    hue: 0,
    saturation: 0,
    color_temp: 2700,
    brightness: 100
  },
↑ Ceci est le réglage de l'appareil
  is_dimmable: 1,
  is_color: 1,
  :

on_off: 0 signifie hors tension, 1 signifie sous tension teinte: couleur? (0 en mode blanc) color_temp: température de couleur (0 si pas en mode blanc) luminosité: Luminosité (en%) Semble être

** ・ Exemple de KL110 (ampoule blanche) **

  :
  ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ Voici les paramètres de l'appareil
  light_state: {
    on_off: 1,
    mode: 'normal',
    hue: 0,
    saturation: 0,
    color_temp: 2700,
    brightness: 100
  },
↑ Ceci est le réglage de l'appareil
  is_dimmable: 1,
  is_color: 0,
  :

on_off: 0 signifie hors tension, 1 signifie sous tension teinte: teinte (0 en mode blanc) saturation: saturation color_temp: température de couleur (0 si pas en mode blanc) luminosité: Luminosité (en%) Il paraît que. C'est presque la même chose que KL130, mais ce n'est pas une couleur, donc ça semble être is_color: 0.

** ・ Exemple de KL105 (prise intelligente) **

  alias: '',
↓ Voici les paramètres de l'appareil
  relay_state: 1,
  on_time: 288,
  active_mode: 'none',
  feature: 'TIM',
  updating: 0,
  icon_hash: '',
  rssi: -52,
  led_off: 0,
  longitude_i: 1356352,
  latitude_i: 348422,
↑ Ceci est le réglage de l'appareil
  hwId: '047D‥',

relay_state: 0 signifie mise hors tension, 1 signifie mise sous tension on_time: temps de mise sous tension continue rssi: force du signal WiFi Il paraît que. La logitude et la latitude sont également affichées, mais le mystère s'approfondit lorsqu'il est à 5 km de l'emplacement réel.

Ci-dessus, nous avons confirmé que vous pouvez obtenir les informations souhaitées avec la commande! Dans les chapitres suivants, nous décrirons comment obtenir et utiliser le programme (Node.js & Python).

② Obtenez le statut avec Node.js

** * Si vous n'avez pas besoin d'expliquer Node.js parce que vous utilisez Python, ignorez ce chapitre et passez à ③ **

Reportez-vous à ici pour Node.js

Passez le chemin à npm (pour Window)

Sous Windows, le chemin ne passe pas à la destination d'installation globale de npm et le module ne peut pas être chargé avec Node.js, veuillez donc le transmettre en vous référant à ce qui suit https://qiita.com/shiftsphere/items/5610f692899796b03f99

Passer le chemin vers npm (pour Raspberry Pi)

Utilisez la commande suivante pour savoir où installer le module npm globalement. (Pour une raison quelconque, il semble être différent du dossier trouvé par la commande "npm bin -g" sous Windows)

npm ls -g 

Modifiez .profile avec la commande suivante.

nano /home/[Nom d'utilisateur]/.profile

Ajoutez la ligne suivante à la fin de .profile et redémarrez

export NODE_PATH=[Chemin examiné ci-dessus]/node_modules

Si le chemin spécifié par la commande suivante est affiché, il réussit.

printenv NODE_PATH

Création d'un script node.js

Créez le script suivant

tplink_test.js


const { Client } = require('tplink-smarthome-api');
const client = new Client();
client.getDevice({ host: '192.168.0.102' }).then(device => {
  device.getSysInfo().then(console.log);
});

Si vous exécutez le script avec la commande suivante, vous pouvez obtenir diverses informations de la même manière que ①.

node tplink_test.js

③ Obtenir l'état avec Python

La connexion à Node.js n'a pas fonctionné en raison de mon manque de compétences JavaScript J'ai retrouvé mon esprit et créé un script pour fonctionner et me connecter avec Python.

Python a eu du mal parce que je n'ai pas trouvé de document aussi poli que Node.js, mais ici et ici J'ai déchiffré le code (tplink-lb130-api / blob / master / lb130.py) et créé le script.

Création d'une classe d'opération TPLink

Ci-dessus J'ai créé les 4 classes suivantes en se référant au code. ** TPLink_Common (): Classe de fonctions communes pour les fiches et les ampoules ** ** TPLink_Plug (): classe de fonctions plug-only ** (héritant de TPLink_Common ()) ** TPLink_Bulb (): Classe de fonction ampoule uniquement ** (héritant de TPLink_Common ()) ** GetTPLinkData (): Une classe pour obtenir des données en utilisant la classe ci-dessus **

tplink.py


import socket
from struct import pack
import json

#Classe d'acquisition de données TPLink
class GetTPLinkData():
    #Méthode d'obtention des données de prise
    def get_plug_data(self, ip):
        #Création d'une classe pour l'opération de plug
        plg = TPLink_Plug(ip)
        #Obtenez des données et convertissez-les en dict
        rjson = plg.info()
        rdict = json.loads(rjson)
        return rdict

    #Méthode d'acquisition des données d'ampoule
    def get_bulb_data(self, ip):
        #Créer une classe pour faire fonctionner une ampoule
        blb = TPLink_Bulb(ip)
        #Obtenez des données et convertissez-les en dict
        rjson = blb.info()
        rdict = json.loads(rjson)
        return rdict

#Ampoule et fiche TPLink classe commune
class TPLink_Common():
    def __init__(self, ip, port=9999):
        """Default constructor
        """
        self.__ip = ip
        self.__port = port
    
    def info(self):
        cmd = '{"system":{"get_sysinfo":{}}}'
        receive = self.send_command(cmd)
        return receive
    
    def send_command(self, cmd, timeout=10):
        try:
            sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock_tcp.settimeout(timeout)
            sock_tcp.connect((self.__ip, self.__port))
            sock_tcp.settimeout(None)
            sock_tcp.send(self.encrypt(cmd))
            data = sock_tcp.recv(2048)
            sock_tcp.close()

            decrypted = self.decrypt(data[4:])
            print("Sent:     ", cmd)
            print("Received: ", decrypted)
            return decrypted

        except socket.error:
            quit("Could not connect to host " + self.__ip + ":" + str(self.__port))
            return None

    def encrypt(self, string):
        key = 171
        result = pack('>I', len(string))
        for i in string:
            a = key ^ ord(i)
            key = a
            result += bytes([a])
        return result

    def decrypt(self, string):
        key = 171
        result = ""
        for i in string:
            a = key ^ i
            key = i
            result += chr(a)
        return result

#Classe d'opération de la prise TPLink
class TPLink_Plug(TPLink_Common):
    def on(self):
        cmd = '{"system":{"set_relay_state":{"state":1}}}'
        receive = self.send_command(cmd)

    def off(self):
        cmd = '{"system":{"set_relay_state":{"state":0}}}'
        receive = self.send_command(cmd)

    def ledon(self):
        cmd = '{"system":{"set_led_off":{"off":0}}}'
        receive = self.send_command(cmd)

    def ledoff(self):
        cmd = '{"system":{"set_led_off":{"off":1}}}'
        receive = self.send_command(cmd)
    
    def set_countdown_on(self, delay):
        cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":1,"name":"turn on"}}}'
        receive = self.send_command(cmd)

    def set_countdown_off(self, delay):
        cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":0,"name":"turn off"}}}'
        receive = self.send_command(cmd)
    
    def delete_countdown_table(self):
        cmd = '{"count_down":{"delete_all_rules":null}}'
        receive = self.send_command(cmd)

    def energy(self):
        cmd = '{"emeter":{"get_realtime":{}}}'
        receive = self.send_command(cmd)
        return receive

#Classe de fonctionnement de l'ampoule TPLink
class TPLink_Bulb(TPLink_Common):
    def on(self):
        cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":1}}}'
        receive = self.send_command(cmd)

    def off(self):
        cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":0}}}'
        receive = self.send_command(cmd)

    def transition_light_state(self, hue: int = None, saturation: int = None, brightness: int = None,
                               color_temp: int = None, on_off: bool = None, transition_period: int = None,
                               mode: str = None, ignore_default: bool = None):
        # copy all given argument name-value pairs as a dict
        d = {k: v for k, v in locals().items() if k is not 'self' and v is not None}
        r = {
            'smartlife.iot.smartbulb.lightingservice': {
                'transition_light_state': d
            }
        }
        cmd = json.dumps(r)
        receive = self.send_command(cmd)
        print(receive)

    def brightness(self, brightness):
        self.transition_light_state(brightness=brightness)

    def purple(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=277, saturation=86, color_temp=0, brightness=brightness, transition_period=transition_period)

    def blue(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=240, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def cyan(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=180, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def green(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=120, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def yellow(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=60, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def orange(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=39, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def red(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=0, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def lamp_color(self, brightness = None):
        self.transition_light_state(color_temp=2700, brightness=brightness)

Comment exécuter la classe d'opération TPLink

La classe ci-dessus peut être exécutée sur du code Python comme ci-dessous

** ・ Lorsque vous souhaitez allumer l'ampoule **

TPLink_Bulb(Adresse IP de l'ampoule).on()

** ・ Lorsque vous souhaitez éteindre la prise **

TPLink_Plug(Adresse IP de la prise).off()

** ・ Lorsque vous souhaitez allumer la prise après 10 secondes **

TPLink_Plug(Adresse IP de la prise).set_countdown_on(10)

** ・ Lorsque vous souhaitez augmenter la luminosité de l'ampoule à 10% **

TPLink_Bulb(Adresse IP de l'ampoule).brightness(10)

** ・ Lorsque vous souhaitez rendre l'ampoule rouge (ampoule de couleur uniquement) **

TPLink_Bulb(Adresse IP de l'ampoule).red()

** ・ Obtenez des informations telles que l'allumage / l'extinction des ampoules **

info = GetTPLinkData().get_plug_data(Adresse IP de la prise)

④ Création d'un script Python pour la journalisation

En utilisant la dernière méthode du chapitre précédent, j'ai créé un script qui enregistre des informations sur les ampoules et les prises. La structure du script est [ici](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E3%83%A1%E3%82%A4%E3%83%B3%E3%82%B9%E3%82 C'est la même chose que% AF% E3% 83% AA% E3% 83% 97% E3% 83% 88% E4% BD% 9C% E6% 88% 90), veuillez donc lire le lien.

fichier de configuration

[Ici](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E8%A8%AD%E5%AE%9A%E3%83%95%E3%82%A1%E3%82%A4%E3 Comme pour l'article% 83% AB), nous avons créé les deux types de fichiers de paramètres suivants pour une gestion plus facile. -DeviceList.csv: Décrivez les informations nécessaires pour chaque capteur

DeviceList.csv
ApplianceName,ApplianceType,IP,Retry
TPLink_KL130_ColorBulb_1,TPLink_ColorBulb,192.168.0.103,2
TPLink_KL110_WhiteBulb_1,TPLink_WhiteBulb,192.168.0.102,2
TPLink_HS105_Plug_1,TPLink_Plug,192.168.0.101,2

La signification des colonnes est la suivante ApplianceName: gérez les noms d'appareils et identifiez plusieurs appareils du même type ApplianceType: type d'appareil. TPLink_ColorBulb: Ampoule de couleur (KL130, etc.) TPLink_WhiteBulb: Ampoule blanche (KL110 etc.) TPLink_Plug: prise intelligente (HS105 etc.) IP: adresse IP de l'appareil Retry: Nombre maximal de réexécutions Détails (nombre de réexécutions en cas d'échec de l'acquisition, cliquez ici](https://qiita.com/c60evaporator/items/283d0569eba58830f86e#%E4%B8%8D%E5%85%B7%E5 % 90% 881périphérique% E3% 81% AE% E5% 88% 9D% E6% 9C% 9F% E5% 8C% 96% E6% 99% 82% E3% 81% AB% E3% 82% A8% E3% 83 % A9% E3% 83% BC% E3% 81% 8C% E5% 87% BA% E3% 82% 8B))

-Config.ini: Spécifiez le répertoire de sortie CSV et journal

config.ini [Path] CSVOutput = /share/Data/Appliance LogOutput = /share/Log/Appliance Si les deux sont sortis dans un dossier partagé créé par samba, il est pratique d'accéder depuis l'extérieur du Raspberry Pi.

Script réel

appliance_data_logger.py


from tplink import GetTPLinkData
import logging
from datetime import datetime, timedelta
import os
import csv
import configparser
import pandas as pd

#Variables globales
global masterdate

######Acquisition de données TPLink######
def getdata_tplink(appliance):
    #Satisfaction maximale lorsqu'aucune valeur de données n'est disponible.Réessayer de répéter l'analyse
    for i in range(appliance.Retry):
        try:
            #Lors du branchement
            if appliance.ApplianceType == 'TPLink_Plug':
                applianceValue = GetTPLinkData().get_plug_data(appliance.IP)
            #Quand c'est une ampoule
            elif appliance.ApplianceType == 'TPLink_ColorBulb' or appliance.ApplianceType == 'TPLink_WhiteBulb':
                applianceValue = GetTPLinkData().get_bulb_data(appliance.IP)
            else:
                applianceValue = None
        #Sortie de journal si une erreur se produit
        except:
            logging.warning(f'retry to get data [loop{str(i)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
            applianceValue = None
            continue
        else:
            break
    
    #Si la valeur peut être obtenue, stockez les données POST dans dict
    if applianceValue is not None:
        #Lors du branchement
        if appliance.ApplianceType == 'TPLink_Plug':
            data = {        
                'ApplianceName': appliance.ApplianceName,        
                'Date_Master': str(masterdate),
                'Date': str(datetime.today()),
                'IsOn': str(applianceValue['system']['get_sysinfo']['relay_state']),
                'OnTime': str(applianceValue['system']['get_sysinfo']['on_time'])
            }
        #Quand c'est une ampoule
        else:
            data = {        
                'ApplianceName': appliance.ApplianceName,        
                'Date_Master': str(masterdate),
                'Date': str(datetime.today()),
                'IsOn': str(applianceValue['system']['get_sysinfo']['light_state']['on_off']),
                'Color': str(applianceValue['system']['get_sysinfo']['light_state']['hue']),
                'ColorTemp': str(applianceValue['system']['get_sysinfo']['light_state']['color_temp']),
                'Brightness': str(applianceValue['system']['get_sysinfo']['light_state']['brightness'])
            }
        return data
        
    #S'il n'a pas pu être obtenu, enregistrer la sortie
    else:
        logging.error(f'cannot get data [loop{str(appliance.Retry)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
        return None

######Sortie CSV des données######
def output_csv(data, csvpath):
    appliancename = data['ApplianceName']
    monthstr = masterdate.strftime('%Y%m')
    #Nom du dossier de destination de sortie
    outdir = f'{csvpath}/{appliancename}/{masterdate.year}'
    #Lorsque le dossier de destination de sortie n'existe pas, créez-en un nouveau
    os.makedirs(outdir, exist_ok=True)
    #Chemin du fichier de sortie
    outpath = f'{outdir}/{appliancename}_{monthstr}.csv'
    
    #Créer un nouveau fichier de sortie lorsqu'il n'existe pas
    if not os.path.exists(outpath):        
        with open(outpath, 'w', newline="") as f:
            writer = csv.DictWriter(f, data.keys())
            writer.writeheader()
            writer.writerow(data)
    #Ajouter une ligne lorsque le fichier de sortie existe
    else:
        with open(outpath, 'a', newline="") as f:
            writer = csv.DictWriter(f, data.keys())
            writer.writerow(data)

######Principale######
if __name__ == '__main__':    
    #Obtenir l'heure de début
    startdate = datetime.today()
    #Arrondissez l'heure de début en minutes
    masterdate = startdate.replace(second=0, microsecond=0)   
    if startdate.second >= 30:
        masterdate += timedelta(minutes=1)

    #Lire le fichier de configuration et la liste des appareils
    cfg = configparser.ConfigParser()
    cfg.read('./config.ini', encoding='utf-8')
    df_appliancelist = pd.read_csv('./ApplianceList.csv')
    #Nombre total de capteurs et acquisition de données réussie
    appliance_num = len(df_appliancelist)
    success_num = 0

    #Initialisation du journal
    logname = f"/appliancelog_{str(masterdate.strftime('%y%m%d'))}.log"
    logging.basicConfig(filename=cfg['Path']['LogOutput'] + logname, level=logging.INFO)

    #Dict pour conserver toutes les données acquises
    all_values_dict = None

    ######Acquisition de données pour chaque appareil######
    for appliance in df_appliancelist.itertuples():
        #Confirmez que le type d'appareil est TPLinke
        if appliance.ApplianceType in ['TPLink_Plug','TPLink_ColorBulb','TPLink_WhiteBulb']:
            data = getdata_tplink(appliance)
        #Autre que ceux ci-dessus
        else:
            data = None        

        #Lorsque des données existent, ajoutez-les à Dict pour contenir toutes les données et générer le CSV
        if data is not None:
            #all_values_Créer un nouveau dictionnaire lorsque dict est None
            if all_values_dict is None:
                all_values_dict = {data['ApplianceName']: data}
            #all_values_Ajouter au dictionnaire existant lorsque dict n'est pas None
            else:
                all_values_dict[data['ApplianceName']] = data

            #Sortie CSV
            output_csv(data, cfg['Path']['CSVOutput'])
            #Numéro de réussite plus
            success_num+=1

    #Sortie du journal de la fin du traitement
    logging.info(f'[masterdate{str(masterdate)} startdate{str(startdate)} enddate{str(datetime.today())} success{str(success_num)}/{str(appliance_num)}]')

Si vous exécutez ce qui précède, les données acquises seront sorties au format CSV avec le nom de l'appareil, la date et l'heure dans le dossier spécifié dans le fichier de configuration "CSVOutput". tplinkcsv.png

Ceci termine l'acquisition des informations.

en conclusion

Il fonctionne 24 heures sur 24 sur RaspberrypPi, et Python a plus de liberté que IFTTT, vous pouvez donc incarner diverses idées. ・ Combiné avec un capteur humain pour allumer l'électricité lorsqu'une personne entre ・ Éteignez la lumière s'il n'y a personne pendant 30 minutes ou plus ・ Commute automatiquement la luminosité de l'ampoule en fonction de la personne Etc.

J'ai des choses que je veux faire, alors je vous en reparlerai lorsque la production sera terminée.

Recommended Posts

Les appareils électroménagers IoT les plus puissants de Cospa! Exploitez les produits TPLink de Raspberry Pi
Comment utiliser facilement les appareils électroménagers IOT de Siri par piratage API
Envoyer des données depuis Raspberry Pi à l'aide d'AWS IOT
Accédons à votre tarte à la râpe domestique depuis l'extérieur de votre maison avec VPN (WireGuard)
Associez SORACOM aux appareils ménagers et au LINE Bot [Python / Flask / Raspberry Pi]
Sortie du Raspberry Pi vers la ligne