Es ist praktisch, Lambda durch Verknüpfen von AWS-Ressourcen zu verwenden, aber das Ereignis ist jedes Mal kompliziert. Dieses Mal möchte ich beim Aufnehmen von DynamoDB Stream in Python in der Lage sein, etwas anderes als die Hauptverarbeitung präzise zu schreiben.
DynamoDB-Streams und AWS Lambda-Trigger - Amazon DynamoDB
Der Inhalt dieses Artikels ist in PyPI als Bibliothek registriert.
Es wird einfacher, wenn Sie die Verarbeitung hier verwenden.
Ich wünschte, ich könnte den Einstiegspunkt so schreiben.
handler(Pläne)
def lambda_handler(event, context):
ds = DynamoStreamDispatcher(event)
ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
ds.on_remove.append(after_remove1)
ds.on_remove.append(after_remove2) #Mehrere Prozesse OK
ds.on_modify.append(after_modify)
ds.dispatch()
return True
Es fühlt sich an, als würde man einen Handler registrieren und versenden.
Vorerst habe ich den Inhalt, den ich schreiben wollte, in den zu Beginn beschlossenen lambda_handler implementiert.
lambda_function.py
from __future__ import print_function
import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()
print('Loading function')
class DeRecord:
##Deserialisierter Gegenstand
def __init__(self, rec):
self.event_name = rec['eventName']
self.old = self._desi(rec['dynamodb'].get('OldImage'))
self.new = self._desi(rec['dynamodb'].get('NewImage'))
def _desi(self, image):
d = {}
if image:
for key in image:
d[key] = deser.deserialize(image[key])
return d
class DynamoStreamDispatcher:
def __init__(self, event):
self.on_insert = []
self.on_remove = []
self.on_modify = []
self.records = []
for r in event['Records']:
#Der Datensatz wird zu einem Diktat verarbeitet.
self.records.append(DeRecord(r))
self.raw = event
def dispatch(self):
"""
synced dispatcher
"""
results = []
for r in self.records:
try:
for runner in getattr(self, 'on_' + r.event_name.lower()):
results.append(runner(r))
except AttributeError:
print("Unknown event " + r.event_name)
continue
return results
##Von hier aus ein Funktionsbeispiel der Verarbeitung für einzelne Lambda. Das Argument ist ein verarbeiteter Datensatz
def after_remove1(rec):
print("deleted")
return None
def after_remove2(rec):
print(rec.old)
return None
def after_modify(rec):
print("key updated...")
print(rec.old['Message'])
print(rec.new['Message'])
return None
def lambda_handler(event, context):
ds = DynamoStreamDispatcher(event)
ds.on_insert.append(lambda rec: print(rec.event_name))
ds.on_remove.append(after_remove1)
ds.on_remove.append(after_remove2)
ds.on_modify.append(after_modify)
ds.dispatch()
return True
Versuchen Sie, Dynamo DB Update (* am Ende angehängt) über die Beispielereignisvorlage auszuführen.
START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3
Jede registrierte Funktion wird ausgeführt und OK.
Es ist fast meine eigene Bibliothek, aber wenn es nichts Ähnliches wie PyPI gibt, kann ich es registrieren und verwenden. Danach möchte ich die Unterschiede speichern und so weiter.
Anhang: Beispielereignis für DynamoDB Update
python
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
},
{
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"OldImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "222",
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 59,
"NewImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "MODIFY",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
},
{
"eventID": "3",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 38,
"SequenceNumber": "333",
"OldImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "REMOVE",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
}
]
}
Referenz:
Recommended Posts