* Python * Steuert ECHONET Lite-kompatible Haushaltsgeräte (Emulatoren)

Einführung

Dieser Artikel kann von jedem verstanden werden, der Flask verwendet und ein Socket-Kommunikationsprogramm geschrieben hat. Versuchen wir, Haushaltsgeräte mit Flask of Python zu steuern. Node.js wird häufig zur Steuerung von Haushaltsgeräten verwendet. Da jedoch die Anzahl der Benutzer von Python zunimmt, verwende ich Python zur Steuerung von Haushaltsgeräten. Zunächst wird als Kommunikationsprotokoll ECHONET Lite verwendet, das standardmäßig bei der Steuerung von Haushaltsgeräten verwendet wird. Nicht viele Haushaltsgeräte sind mit ECHONET Lite kompatibel. Selbst wenn es nicht von ECHONET Lite gesteuert werden kann, kann es möglicherweise von dort aus gesteuert werden, wenn WebAPI usw. bereitgestellt wird. Suchen Sie nach einer Web-API sowie nach ECHONET Lite, wenn Sie ein zu steuerndes Haushaltsgerät finden. In Bezug auf die Haushaltsgeräte, die dieses Mal gesteuert werden sollen, ist es schwierig, ECHONET LITE-kompatible Haushaltsgeräte vorzubereiten. Daher verwenden wir stattdessen einen Emulator.

Annahme

Die Entwicklungsumgebung ist wie folgt. Dieses Mal werden wir zwei PCs verwenden. --PC zum Ausführen des Emulators - Windows 10 (64bit)

--PC zum Ausführen des Steuerprogramms für Haushaltsgeräte - Python 3.8.2 - Ubuntu 18.04.5 LTS

Ausführen des Emulators

Der Emulator verwendet Moekaden Room. Gehen Sie also zu GitHub Page und gehen Sie zu README.md Laden Sie die komprimierte Datei von Download ausführbare Dateien von herunter. Extrahieren Sie die komprimierte Datei und führen Sie "MoekadenRoom.exe" aus. Um es auszuführen, muss Java installiert sein. Wenn Java nicht installiert ist, installieren Sie es von hier. Wenn Sie MoekadenRoom.exe ausführen, wird der Emulator wie unten gezeigt angezeigt. Sie können Haushaltsgeräte mit GUI bedienen, versuchen Sie es also bitte.

Steuerung von Haushaltsgeräten (Emulator) durch Python (kein Steuerbefehl / Antwort)

Senden Sie mit Python Steuerbefehle an Haushaltsgeräte. Da es fast mit einem allgemeinen Socket-Kommunikations-Client-Programm identisch ist, wird zuerst der Beispielcode angezeigt. Wenn Sie den Beispielcode bei laufendem MoekadenRoom ausführen, leuchten die Emulatorlichter auf.

Beispielcode

echonet_lite.py


import socket
import binascii


def send_command():
    #IP-Adresse des PCs, auf dem Moekaden Room ausgeführt wird (jeweils ändern)
    ip = '192.168.0.10'
    #ECHONET Lite verwendet den UDP-Port 3610
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '60',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '01',
        format_echonet_lite[8]: '30'  #Sie können das Licht ausschalten, indem Sie auf 31 wechseln
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]

    #Konvertieren Sie eine hexadezimale Zeichenfolge in eine Byte-Zeichenfolge
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(msg, (ip, ECHONETport))
    
    
if __name__ == '__main__':
    send_command();

Rahmen zum Übertragen

Dies ist eine Erklärung des Beispielcodes. Erstellen Sie eine zu übertragende Nachricht basierend auf dem Format des ECHONET Lite-Frames. Auf der folgenden Seite finden Sie Erläuterungen zum Rahmen.

Beziehen Sie sich auf die obige Seite und geben Sie Werte für "EHD, TID, SEOJ, DEOJ, ESV, OPC, EPC, PDC, EDT" ein. Bitte überprüfen Sie die obige Seite für Details. Der Wert ist eine Hexadezimalzahl.

Nachrichtenkomponente Wert (Beschreibung)
ETD 1081 (Der ECHONET-Protokolltyp 1081 ist ECHONET Lite)
TID 0000 (Eine ID zum Verknüpfen einer Anforderung mit einer Antwort, die ein beliebiger numerischer Wert sein kann.)
SEOJ 05FF01 (Zeigt das Quellgerät an. Der Wert der Steuerung wird eingegeben.
Referenz:Detaillierte Vorschriften für Geräteobjekte)
DEOJ 029001 (Zeigt das Zielgerät (Beleuchtung) an. Ich lege den Wert der Beleuchtung)
ESV 60 (60 ist ein Steuerbefehl ohne Antwort vom Ziel)
OPC 01 (Gibt die Anzahl der Eigenschaften an, die in einer Nachricht verarbeitet werden sollen)
EPC 80 (Geben Sie die Eigenschaft an.MoekadenRoomLiesmich.SuchenachLightningEPCvonmd)
PDC 01 (Gibt die Anzahl der Bytes in EDT an. 01 weil es nur einen Steuerinhalt gibt)
EDT 30 (Geben Sie den Wert gemäß dem von EPC angegebenen Eigennamen ein.MoekadenRoomLiesmich.SuchenSienachEDT,umdasLichtvonmdeinzuschalten)

Steuerung von Haushaltsgeräten (Emulator) durch Python (mit Steuerbefehl / Antwort)

Der vorherige Steuerbefehl reagierte nicht. Hier senden wir einen Steuerbefehl und erhalten eine Antwort vom Haushaltsgerät. Schalten Sie wie zuvor die Lampe des Emulators ein. Der Unterschied besteht darin, dass Sie eine Antwort erhalten. Mit anderen Worten, wir haben zuvor nur das Socket-Kommunikations-Client-Programm erstellt, aber hier müssen wir auch das Socket-Kommunikationsserver-Programm erstellen, um die Antwort zu erhalten. Wir werden zwei Methoden erklären, eine besteht darin, die Dateien für das Client-Programm und das Server-Programm zu trennen und getrennt auszuführen, und die andere darin, sie in einer Datei zu kombinieren. Zunächst werde ich aus dem einfachen ersteren erklären.

So trennen Sie Client und Server in separate Dateien

Das Client-Programm ist fast das gleiche wie zuvor. Es ist der Frame-Wert von ECHONET Lite, der geändert wird. Indem Sie den Wert von "ESV" auf "61" setzen, können Sie einen Steuerbefehl mit einer Antwort senden.

echonet_lite.py


import socket
import binascii


def send_command():
    #IP-Adresse des PCs, auf dem Moekaden Room ausgeführt wird (jeweils ändern)
    ip = '192.168.0.10'
    #ECHONET Lite verwendet den UDP-Port 3610
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '61', #Ändern Sie das vorherige Programm (60 → 61)
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '01',
        format_echonet_lite[8]: '30' 
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]

    #Konvertieren Sie eine hexadezimale Zeichenfolge in eine Byte-Zeichenfolge
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(msg, (ip, ECHONETport))
    
    
if __name__ == '__main__':
    send_command();

Das Programm, das die Antwort empfängt, unterscheidet sich nicht von einem allgemeinen Socket-Kommunikationsserverprogramm. Da die Antwort jedoch als Byte-Zeichenfolge empfangen wird, muss sie in eine hexadezimale Zeichenfolge konvertiert werden. Da die zum Zeitpunkt der Antwort empfangenen Daten dasselbe Format wie der zuvor gesendete Frame haben, kann der Inhalt gelesen werden, indem er in eine hexadezimale Zeichenfolge konvertiert wird.

socket_server.py


import socket
import binascii

def receive_state():
    ECHONETport = 3610

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    data, addr = sock.recvfrom(4096)
    
    #Konvertieren Sie eine Byte-Zeichenfolge in eine hexadezimale Zeichenfolge
    data = str(binascii.hexlify(data), 'utf-8')
    print(data)


if __name__ == '__main__':
    receive_state()

Starten Sie zuerst MoekadenRoom und führen Sie das Serverprogramm und dann das Clientprogramm in dieser Reihenfolge aus. Als Ergebnis der Ausführung des Serverprogramms wird die folgende Ausgabe erhalten.

$ python3 socket_server.py
#Ausgabeergebnis
1081000002900105ff0171018000

Wenn das Ausgabeergebnis gelesen wird, ist es wie folgt. Aus dem Wert von "EPC" können wir erkennen, dass es sich um eine Antwort auf den Steuerbefehl handelt.

Nachrichtenkomponente Wert (Beschreibung)
ETD 1081 (Der ECHONET-Protokolltyp 1081 ist ECHONET Lite)
TID 0000 (Eine ID zum Verknüpfen einer Anforderung mit einer Antwort, die ein beliebiger numerischer Wert sein kann.)
SEOJ 029001 (Quellgerät: Beleuchtung)
DEOJ 05FF01 (Zielgerät: Controller)
ESV 71 (Die Antwort auf 61 wird mit 71 zurückgegeben)
OPC 01 (Anzahl der Eigenschaften, die in einer Nachricht verarbeitet werden sollen)
EPC 80 (Eigenschaftsspezifikation. Blitz EPC)
PDC 00 (Anzahl der Bytes in EDT. 00 weil es keine Kontrolle ist)
EDT Himmel(EDT wird nicht angegeben, da es sich nicht um ein Steuerelement handelt)

So kombinieren Sie Client und Server in einer Datei

Im Serverprogramm wartet es, bis es die Daten empfängt (Blockierung), sodass die Verarbeitung an diesem Teil stoppt. Um das Client-Programm gleichzeitig zu verarbeiten, ist es besser, "thread" zu verwenden. Das Serverprogramm wird in einem separaten Thread verarbeitet.

Um die Verarbeitung an einen anderen Thread zu übergeben, erstellen Sie eine Instanz der Klasse "Thread" und geben Sie die Funktion, die Sie in einem anderen Thread verarbeiten möchten, im Argument "Ziel" an. Starten Sie thread mit der start Methode.

th = Thread(target=function)
th.start()

Hier ist der Beispielcode. Es ist ein Programm, das einen Steuerbefehl an die Beleuchtung sendet und 3 Sekunden nach Ausführung des Programms eine Antwort zurückgibt.

echonet_lite.py


import socket
import binascii
from threading import Thread
import time

def send_command():
    #IP-Adresse des PCs, auf dem Moekaden Room ausgeführt wird (jeweils ändern)
    ip = '192.168.0.10'
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '61',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '01',
        format_echonet_lite[8]: '30',   #Sie können das Licht ausschalten, indem Sie auf 31 wechseln
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(msg, (ip, ECHONETport))
    print('send')


def receive_state():
    ECHONETport = 3610

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    data, addr = sock.recvfrom(4096)
    data = str(binascii.hexlify(data), 'utf-8')
    print('receive')
    print(data)


if __name__ == '__main__':
    th = Thread(target=receive_state)
    th.start()
    time.sleep(3)
    send_command()

Steuerung von Haushaltsgeräten (Emulator) durch Python (Statusanforderung)

Als Kontrolle für Haushaltsgeräte werden wir versuchen, den aktuellen Status von Haushaltsgeräten zu ermitteln. Zusätzlich zu den oben erläuterten Steuerbefehlen (mit / ohne Antwort) sind die hier erläuterten Statusanforderungen die Haupttypen der Steuerung von Haushaltsgeräten. Da auch eine Antwort auf die Statusanforderung erfolgt, müssen sowohl ein Client-Programm als auch ein Server-Programm für die Socket-Kommunikation erstellt werden. In der Statusanforderung beträgt der Wert von "ESV" zum Zeitpunkt der Übertragung "62" und der Wert von "ESV" zum Zeitpunkt der Antwort "72". Da es sich nicht um einen Steuerbefehl handelt, ist der Wert von "PDC" "00" und der Wert von "EDT" ist "leer". Basierend auf diesen wird der Beispielcode unten gezeigt. Im Beispielcode wird die Antwort alle 5 Sekunden ausgegeben. Klicken Sie während der Ausführung des Beispielcodes auf die Beleuchtungstaste des Moekaden-Raums, um ihn ein- und auszuschalten. Der EDT-Ausgabewert sollte sich ändern.

echonet_lite.py


import socket
import binascii
from threading import Thread
import time

def receive_state():
    ECHONETport = 3610
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    while True:
        data, addr = sock.recvfrom(4096)
        data = str(binascii.hexlify(data), 'utf-8')
        print(data)

def confirm_state():
    ip = '192.168.0.10'
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '62',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '00',
        format_echonet_lite[8]: ''
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]
    msg = binascii.unhexlify(frame)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    while True:
        sock.sendto(msg, (ip, ECHONETport))
        time.sleep(5)
        
if __name__ == '__main__':
    th1 = Thread(target=receive_state)
    th1.start()
    confirm_state()

Zusammenarbeit mit Flask

Verwenden Sie schließlich Flask, um einen HTTP-Server einzurichten und Haushaltsgeräte zu steuern. Das Verwalten des Status von Haushaltsgeräten mit einem HTTP-Server wird häufig bei der Steuerung mehrerer Haushaltsgeräte verwendet. Auf diese Weise können Haushaltsgeräte von Android aus gesteuert und IoT-Daten erfasst werden. Schauen wir uns zunächst ein einfaches Programm an, das die Flasche und Haushaltsgeräte als einen Schritt in diese Richtung steuert. Hier wird "Warteschlange" verwendet, da mehrere Threads verwendet werden. queue ist eine Struktur, mit der Daten in der Reihenfolge abgerufen werden können, in der sie eingegeben wurden. Die grundlegende Verwendung von "Warteschlange" ist wie folgt.

#Warteschlange erstellen
q = Queue();
#Fügen Sie Daten mit "put" in die Warteschlange ein und rufen Sie sie mit "get" ab.
q_msg = {"name": "foge"}
q.put(q_msg)
q_result = q.get()
print(q_result["name"])

Jetzt ist es etwas kompliziert, aber hier ist ein Beispielcode. Das Verfahren zum Ausführen des Programms besteht darin, zuerst MoekadenRoom zu starten, dann das Programm auszuführen und schließlich auf "http: //192.168.0.10: 5000" zuzugreifen. Es ist ein Programm, das alle 5 Sekunden den Status von Haushaltsgeräten erfasst und ausgibt und alle 10 Sekunden EIN / AUS-Befehle an Haushaltsgeräte gibt.

app.py


import socket
import binascii
from flask import Flask
from queue import Queue
from threading import Thread
import time

app = Flask(__name__)

q = Queue()
state_q = Queue()

@app.route('/', methods=['GET'])
def home():
    th1 = Thread(target=receive_state)
    th1.start()
    th2 = Thread(target=confirm_state)
    th2.start()
    th3 = Thread(target=send_command)
    th3.start()
    return 'This is a Sample Webpage'
    
def send_command():
    while True:
        time.sleep(10)
        if state_q.empty():
            state = '30'
        else:
            q_state_msg = state_q.get()
            q_state = q_state_msg['state']
            if q_state == '30':
                state = '31'
            else:
                state = '30'
        #IP-Adresse des PCs, auf dem Moekaden Room ausgeführt wird (jeweils ändern)
        ip = '192.168.0.10'
        ECHONETport = 3610
        format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
        data_format = {
            format_echonet_lite[0]: '1081',
            format_echonet_lite[1]: '0000',
            format_echonet_lite[2]: '05FF01',
            format_echonet_lite[3]: '029001',
            format_echonet_lite[4]: '60',
            format_echonet_lite[5]: '01',
            format_echonet_lite[6]: '80',
            format_echonet_lite[7]: '01',
            format_echonet_lite[8]: state  # state: 30 or 31 (ON or OFF)
        }
        frame = ''
        for key in format_echonet_lite:
            frame += data_format[key]
        msg = binascii.unhexlify(frame)
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(msg, (ip, ECHONETport))
        print('send_command')

def receive_state():
    ECHONETport = 3610
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    while True:
        data, addr = sock.recvfrom(4096)
        data = str(binascii.hexlify(data), 'utf-8')
        queue_msg = q.get()
        if state_q.empty():
            data_format = queue_msg['format']
            state = get_state(data, data_format)
            q_state_msg = {
                'state': state
            }
            state_q.put(q_state_msg)
        print(data)

def confirm_state():
    ip = '192.168.0.10'
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '62',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '00',
        format_echonet_lite[8]: ''
    }
    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    while True:
        sock.sendto(msg, (ip, ECHONETport))
        print('send_confirm_state')
        queue_msg = {
            'format': data_format
        }
        q.put(queue_msg)
        time.sleep(5)

#Eine Funktion zum Abrufen des EDT-Werts aus einem hexadezimalen Zeichenfolgenrahmen
def get_state(data, data_format):
    data_list = []
    data_pos = 0
    for value in data_format.values():
        element_size = len(value)
        if element_size == 0:
            element_size = 2
        data_list.append(data[data_pos: data_pos + element_size])
        data_pos += element_size
    state = data_list[8]
    return state

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

Am Ende

Ich habe ECHONET Lite mit Python verwendet, um Haushaltsgeräte (Emulatoren) zu steuern. Dieses Mal haben wir uns auf die einfachste steuerbare Beleuchtung konzentriert. Versuchen Sie anhand dieses Beispielcodes, verschiedene Steuerelemente für Haushaltsgeräte zu realisieren. Es gibt nicht viele Informationen zur Steuerung von Haushaltsgeräten mit Python. Erhöhen Sie daher die Anzahl der Qiita-Artikel nach und nach. (Bis Sie die am besten geeignete Schnittstelle zur Steuerung Ihrer Haushaltsgeräte gefunden haben ...)

Recommended Posts

* Python * Steuert ECHONET Lite-kompatible Haushaltsgeräte (Emulatoren)
Betreiben Sie ECHONET Lite-Haushaltsgeräte mit Python
Betreiben Sie Haushaltsgeräte mit Python und IRKit