Cet article peut être compris par quiconque a utilisé Flask et a écrit un programme de communication socket. Essayons de contrôler les appareils ménagers en utilisant Flask of Python. Node.js est souvent utilisé pour contrôler les appareils ménagers, mais comme le nombre de personnes utilisant Python augmente, j'utiliserai Python pour contrôler les appareils ménagers. Tout d'abord, comme protocole de communication, ECHONET Lite, qui est utilisé en standard lors du contrôle des appareils ménagers, est utilisé. Peu d'appareils électroménagers sont compatibles avec ECHONET Lite. Cependant, même s'il ne peut pas être contrôlé par ECHONET Lite, il peut être possible de le contrôler à partir de là si WebAPI etc. est fourni. Lorsque vous recherchez un appareil électroménager à contrôler, essayez de rechercher l'API Web ainsi que ECHONET Lite. Ensuite, concernant les appareils électroménagers à contrôler cette fois, il est difficile de préparer des appareils électroménagers compatibles ECHONET LITE, nous allons donc utiliser un émulateur à la place.
L'environnement de développement est le suivant. Cette fois, nous utiliserons deux PC. --PC pour exécuter l'émulateur - Windows 10 (64bit) --Java installé
--PC pour exécuter le programme de contrôle d'appareils ménagers - Python 3.8.2 - Ubuntu 18.04.5 LTS
L'émulateur utilise Moekaden Room, alors allez sur GitHub Page et allez sur README.md Téléchargez le fichier compressé depuis
Télécharger les exécutables de
. Extrayez le fichier compressé et exécutez MoekadenRoom.exe
. Pour l'exécuter, Java doit être installé. Si Java n'est pas installé, installez-le depuis ici. Lorsque vous exécutez MoekadenRoom.exe
, l'émulateur s'affiche comme illustré ci-dessous. Vous pouvez utiliser des appareils électroménagers avec une interface graphique, alors essayez-le.
Envoyez des commandes de contrôle aux appareils électroménagers avec Python. Étant donné qu'il est presque identique à un programme client de communication socket général, l'exemple de code est affiché en premier. Si vous exécutez l'exemple de code avec MoekadenRoom en cours d'exécution, les voyants de l'émulateur s'allumeront.
echonet_lite.py
import socket
import binascii
def send_command():
#Adresse IP du PC exécutant Moekaden Room (changez chacune d'elles)
ip = '192.168.0.10'
#ECHONET Lite utilise le port UDP 3610
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '60',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: '30' #Vous pouvez éteindre les lumières en passant à 31
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
#Convertir une chaîne hexadécimale en chaîne d'octets
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
if __name__ == '__main__':
send_command();
Ceci est une explication de l'exemple de code. Créez un message à transférer en fonction du format de la trame ECHONET Lite. Veuillez vous reporter à la page suivante pour l'explication du cadre.
Reportez-vous à la page ci-dessus et entrez les valeurs dans ʻEHD, TID, SEOJ, DEOJ, ESV, OPC, EPC, PDC, EDT`. Veuillez consulter la page ci-dessus pour plus de détails. La valeur est un nombre hexadécimal.
Composant de message | Valeur (description) |
---|---|
ETD | 1081 (Type de protocole ECHONET, 1081 est ECHONET Lite) |
TID | 0000 (Un ID pour associer une demande à une réponse, qui peut être n'importe quelle valeur numérique.) |
SEOJ | 05FF01 (Indique le périphérique source. La valeur du contrôleur est entrée. référence:Règlements détaillés pour les objets de l'appareil) |
DEOJ | 029001 (Indique l'appareil de destination (éclairage). Je mets la valeur de l'éclairage) |
ESV | 60 (60 est une commande de contrôle sans réponse de la destination) |
OPC | 01 (Indique le nombre de propriétés à traiter dans un message) |
EPC | 80 (Spécifiez la propriété.MoekadenRoomLisez-moi.RechercherLightningEPCdemd) |
PDC | 01 (Indique le nombre d'octets dans EDT. 01 car il n'y a qu'un seul contenu de contrôle) |
EDT | 30 (Entrez la valeur en fonction du nom correct spécifié par EPC.MoekadenRoomLisez-moi.RecherchezEDTpourallumerlalumièredemd) |
La commande de contrôle précédente ne répondait pas. Ici, nous enverrons une commande de contrôle et recevrons une réponse de l'appareil électroménager. Comme précédemment, allumez la lampe de l'émulateur. La différence est que vous recevez une réponse. En d'autres termes, nous avons créé précédemment uniquement le programme client de communication socket, mais ici, nous devons également créer le programme serveur de communication socket afin de recevoir la réponse. Nous allons expliquer deux méthodes, l'une consiste à séparer les fichiers pour le programme client et le programme serveur et à les exécuter séparément, et l'autre consiste à les combiner en un seul fichier. Tout d'abord, je vais expliquer à partir du premier simple.
Le programme client est presque le même qu'avant. C'est la valeur de trame de ECHONET Lite qui est modifiée. En définissant la valeur de ʻESV sur
61`, vous pouvez envoyer une commande de contrôle avec une réponse.
echonet_lite.py
import socket
import binascii
def send_command():
#Adresse IP du PC exécutant Moekaden Room (changez chacune d'elles)
ip = '192.168.0.10'
#ECHONET Lite utilise le port UDP 3610
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '61', #Changer le programme précédent (60 → 61)
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: '30'
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
#Convertir une chaîne hexadécimale en chaîne d'octets
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
if __name__ == '__main__':
send_command();
Le programme qui reçoit la réponse n'est pas différent d'un programme de serveur de communication socket général. Cependant, comme la réponse est reçue sous forme de chaîne d'octets, elle doit être convertie en chaîne de caractères hexadécimaux. Puisque les données reçues au moment de la réponse sont au même format que la trame envoyée précédemment, le contenu peut être lu en le convertissant en une chaîne de caractères hexadécimaux.
socket_server.py
import socket
import binascii
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
data, addr = sock.recvfrom(4096)
#Convertir une chaîne d'octets en chaîne hexadécimale
data = str(binascii.hexlify(data), 'utf-8')
print(data)
if __name__ == '__main__':
receive_state()
Commencez par démarrer MoekadenRoom et exécutez le programme serveur, puis le programme client dans cet ordre. À la suite de l'exécution du programme serveur, la sortie suivante sera obtenue.
$ python3 socket_server.py
#Résultat de sortie
1081000002900105ff0171018000
Lorsque le résultat de sortie est lu, ce sera comme suit. A partir de la valeur de ʻEPC`, nous pouvons voir que c'est une réponse à la commande de contrôle.
Composant de message | Valeur (description) |
---|---|
ETD | 1081 (Type de protocole ECHONET, 1081 est ECHONET Lite) |
TID | 0000 (Un ID pour associer une demande à une réponse, qui peut être n'importe quelle valeur numérique.) |
SEOJ | 029001 (Appareil source: éclairage) |
DEOJ | 05FF01 (Périphérique de destination: contrôleur) |
ESV | 71 (La réponse à 61 est renvoyée avec 71) |
OPC | 01 (Nombre de propriétés à traiter dans un message) |
EPC | 80 (Spécification de la propriété. EPC éclair) |
PDC | 00 (Nombre d'octets dans EDT. 00 parce que ce n'est pas le contrôle) |
EDT | Ciel(EDT n'est pas spécifié car il ne s'agit pas d'un contrôle) |
Dans le programme serveur, il attend de recevoir les données (blocage), donc le traitement s'arrête à cette partie. Pour traiter le programme client en même temps, il est préférable d'utiliser thread
. Le programme serveur est traité dans un thread distinct.
Pour passer le traitement à un autre thread, créez une instance de la classe Thread
et spécifiez la fonction que vous souhaitez traiter dans un autre thread dans l'argument target
. Démarrez thread
avec la méthode start
.
th = Thread(target=function)
th.start()
Maintenant, voici l'exemple de code. C'est un programme qui envoie une commande de contrôle à l'éclairage et renvoie une réponse 3 secondes après l'exécution du programme.
echonet_lite.py
import socket
import binascii
from threading import Thread
import time
def send_command():
#Adresse IP du PC exécutant Moekaden Room (changez chacune d'elles)
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '61',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: '30', #Vous pouvez éteindre les lumières en passant à 31
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
print('send')
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
data, addr = sock.recvfrom(4096)
data = str(binascii.hexlify(data), 'utf-8')
print('receive')
print(data)
if __name__ == '__main__':
th = Thread(target=receive_state)
th.start()
time.sleep(3)
send_command()
En tant que contrôle des appareils ménagers, nous essaierons d'acquérir l'état actuel des appareils ménagers. En plus des commandes de contrôle (avec / sans réponse) expliquées ci-dessus, les requêtes d'état expliquées ici sont les principaux types de contrôle d'appareils ménagers. Puisqu'il y a également une réponse à la demande d'état, il est nécessaire de créer à la fois un programme client et un programme serveur pour la communication par socket. Dans la demande d'état, la valeur de «ESV» au moment de la transmission est «62», et la valeur de «ESV» au moment de la réponse est «72». De plus, puisqu'il ne s'agit pas d'une commande de contrôle, la valeur de «PDC» est «00» et la valeur de «EDT» est «vide». Basé sur ceux-ci, l'exemple de code est illustré ci-dessous. Dans l'exemple de code, la réponse est sortie toutes les 5 secondes. Lors de l'exécution de l'exemple de code, cliquez sur le bouton d'éclairage de Moekaden Room pour activer / désactiver. La valeur EDT de sortie doit changer.
echonet_lite.py
import socket
import binascii
from threading import Thread
import time
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
while True:
data, addr = sock.recvfrom(4096)
data = str(binascii.hexlify(data), 'utf-8')
print(data)
def confirm_state():
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '62',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '00',
format_echonet_lite[8]: ''
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
sock.sendto(msg, (ip, ECHONETport))
time.sleep(5)
if __name__ == '__main__':
th1 = Thread(target=receive_state)
th1.start()
confirm_state()
Enfin, utilisez Flask pour configurer un serveur HTTP et contrôler les appareils ménagers. La gestion de l'état des appareils électroménagers avec un serveur HTTP est une méthode souvent utilisée pour contrôler plusieurs appareils électroménagers. En appliquant cela, il peut être utilisé pour contrôler les appareils ménagers depuis Android et également pour collecter des données IoT. Tout d'abord, examinons un programme simple qui contrôle Flask et les appareils ménagers comme une étape vers cette fin. Ici, queue
est utilisé car plusieurs threads sont utilisés. queue
est une structure qui permet de récupérer les données dans l'ordre dans lequel elles ont été entrées. L'utilisation de base de «queue» est la suivante.
#Créer une file d'attente
q = Queue();
#Insérez des données dans la file d'attente avec "put" et récupérez-les avec "get"
q_msg = {"name": "foge"}
q.put(q_msg)
q_result = q.get()
print(q_result["name"])
Maintenant, c'est un peu compliqué, mais voici un exemple de code. La procédure pour exécuter le programme est de démarrer d'abord MoekadenRoom, puis d'exécuter le programme et enfin d'accéder à http: //192.168.0.10: 5000
. C'est un programme qui acquiert l'état des appareils électroménagers toutes les 5 secondes et les émet, et donne des commandes MARCHE / ARRÊT aux appareils électroménagers toutes les 10 secondes.
app.py
import socket
import binascii
from flask import Flask
from queue import Queue
from threading import Thread
import time
app = Flask(__name__)
q = Queue()
state_q = Queue()
@app.route('/', methods=['GET'])
def home():
th1 = Thread(target=receive_state)
th1.start()
th2 = Thread(target=confirm_state)
th2.start()
th3 = Thread(target=send_command)
th3.start()
return 'This is a Sample Webpage'
def send_command():
while True:
time.sleep(10)
if state_q.empty():
state = '30'
else:
q_state_msg = state_q.get()
q_state = q_state_msg['state']
if q_state == '30':
state = '31'
else:
state = '30'
#Adresse IP du PC exécutant Moekaden Room (changez chacune d'elles)
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '60',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: state # state: 30 or 31 (ON or OFF)
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
print('send_command')
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
while True:
data, addr = sock.recvfrom(4096)
data = str(binascii.hexlify(data), 'utf-8')
queue_msg = q.get()
if state_q.empty():
data_format = queue_msg['format']
state = get_state(data, data_format)
q_state_msg = {
'state': state
}
state_q.put(q_state_msg)
print(data)
def confirm_state():
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '62',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '00',
format_echonet_lite[8]: ''
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
sock.sendto(msg, (ip, ECHONETport))
print('send_confirm_state')
queue_msg = {
'format': data_format
}
q.put(queue_msg)
time.sleep(5)
#Une fonction pour obtenir la valeur EDT à partir d'un cadre de chaîne hexadécimal
def get_state(data, data_format):
data_list = []
data_pos = 0
for value in data_format.values():
element_size = len(value)
if element_size == 0:
element_size = 2
data_list.append(data[data_pos: data_pos + element_size])
data_pos += element_size
state = data_list[8]
return state
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
J'ai utilisé ECHONET Lite avec Python pour contrôler les appareils ménagers (émulateurs). Cette fois, nous nous sommes concentrés sur l'éclairage contrôlable le plus simple. Sur la base de cet exemple de code, essayez de réaliser divers contrôles pour les appareils ménagers. Il n'y a pas beaucoup d'informations sur le contrôle des appareils électroménagers à l'aide de Python, augmentons donc petit à petit le nombre d'articles Qiita. (Jusqu'à ce que vous trouviez l'interface la plus adaptée pour contrôler vos appareils électroménagers ...)