Registrieren Sie Dinge in AWS IoT mit dem AWS IoT Python SDK. Wenn es viele Dinge gibt, ist es schwierig, sich jedes Mal auf der Konsole zu registrieren.
Gehen Sie zur Registrierung des Artikels wie folgt vor.
--Erstelle eine Gruppe, zu der Dinge gehören. (Verfahren weggelassen)
--Erstellen Sie eine Richtlinie zum Anhängen an das Zertifikat. (Verfahren weggelassen)
import boto3
import json
import os
class AWSIoT():
#Zertifikat, Schlüsseldateiname
FILENAME_PUBLIC_KEY = 'public_key.pem'
FILENAME_PRIVATE_KEY = 'private_key.pem'
FILENAME_CERT = 'cert.pem'
def __init__(self, dirpath_cert):
#Instanziieren Sie die zu verwendende Klasse
self.client_iot = boto3.client('iot')
self.client_iotdata = boto3.client('iot-data')
#Verzeichnis zum Speichern von Zertifikaten
self.DIRPATH_CERT = dirpath_cert
return
def enroll_thing(self, thing_name, dict_attr, group_name, property_desired, property_reported, policy):
'''
Registrieren Sie Dinge in AWS IoT
'''
#Registrieren Sie Dinge in AWS IoT ("Attribut"Registrieren Sie Informationen zu Dingen in
self.__create_thing(thing_name, dict_attr)
#Hinzufügen registrierter Elemente zur Gruppe
self.client_iot.add_thing_to_thing_group(thingGroupName=group_name, thingName=thing_name)
#Registrieren Sie Informationen im Geräteschatten
self.__update_shadow(thing_name, property_desired, property_reported)
#Gerätzertifikat / -schlüssel ausstellen und speichern
response = self.__create_keys_and_cert(thing_name)
#Richtlinie an Zertifikat anhängen
self.client_iot.attach_policy(policyName=policy, target=response['certificateArn'])
#Verknüpfen Sie das Zertifikat mit dem Gerät
self.client_iot.attach_thing_principal(thingName=thing_name, principal=response['certificateArn'])
return
def __create_thing(self, thingname, dict_attr):
'''
Registrieren Sie Dinge in AWS IoT ("Attribut"Registrieren Sie Informationen zu Dingen in
'''
#Registrierungsinformationen generieren (Attribut)
attributePayload = self.__create_attribute_payload(dict_attr)
#Dinge registrieren
self.client_iot.create_thing(
thingName=thingname,
attributePayload=attributePayload
)
return
def __create_attribute_payload(self, dict_attr):
'''
Registrierungsinformationen generieren (Attribut)
'''
attributePayload = {
'attributes': dict_attr
}
return attributePayload
def __update_shadow(self, thing_name, property_desired, property_reported):
'''
Registrieren Sie Informationen im Geräteschatten
'''
#Formatieren von Versionsinformationen zum Schreiben in den Geräteschatten
payload = self.__create_payload(property_desired, property_reported)
#Registrieren Sie Informationen im Geräteschatten
self.client_iotdata.update_thing_shadow(
thingName=thing_name,
payload=payload
)
return
def __create_payload(self, property_desired, property_reported):
'''
Formatieren von Versionsinformationen zum Schreiben in den Geräteschatten
'''
payload = json.dumps({'state':
{"desired": {"property": property_desired},
"reported": {"property": property_reported}}})
return payload
def __create_keys_and_cert(self, thing_name):
'''
Gerätzertifikat / -schlüssel ausstellen und speichern
'''
#Zertifikat und Schlüssel ausstellen
response = self.client_iot.create_keys_and_certificate(setAsActive=True)
#Generieren Sie einen Zielverzeichnispfad
dirpath_save = self.DIRPATH_CERT + thing_name + '/'
#In Datei schreiben und speichern
self.__write_to_file(dirpath_save, self.FILENAME_PUBLIC_KEY, response['keyPair']['PublicKey'])
self.__write_to_file(dirpath_save, self.FILENAME_PRIVATE_KEY, response['keyPair']['PrivateKey'])
self.__write_to_file(dirpath_save, self.FILENAME_CERT, response['certificatePem'])
return response
def __write_to_file(self, dirpath, filename, contents):
'''
In Datei schreiben
'''
os.makedirs(dirpath, exist_ok=True)
filepath = dirpath + filename
with open(filepath, mode='w') as f:
f.write(contents)
return
#Der Name der Sache
thing_name = 'ThingName'
#Attribut der Sache (Attributschlüssel:Wert)
dict_attr = {'BuildingName':'hogehoge_building', 'Floor':'6'}
#Der Name der Gruppe, zu der das Ding gehört
group_name = 'hogehoge_group'
#Informationen, die im Geräteschatten registriert werden sollen
temp_desired = 26
temp_reported = 22
#Richtlinie zum Anhängen an das Zertifikat
policy = 'policy_thermometer'
#Pfad des Verzeichnisses zum Speichern von Zertifikat und Schlüssel
dirpath_cert = './cert/'
awsiot = AWSIoT(dirpath_cert)
awsiot.enroll_thing(thing_name, dict_attr, group_name, temp_desired, temp_reported, policy)
Das Gerät wurde registriert. Die Attribute werden auch korrekt registriert.
Der Schatten ist auch korrekt registriert. ("Delta" wird automatisch erstellt. Details werden weggelassen.)
Das Zertifikat ist auch korrekt verknüpft,
Dem Zertifikat ist eine Richtlinie beigefügt.
Ich bin ein sehr Anfänger, daher würde ich es begrüßen, wenn Sie auch die kleinsten Dinge hervorheben und kommentieren könnten. Ich mache Twitter → @shin_job
Recommended Posts