Il est pratique d'utiliser Lambda en liant les ressources AWS, mais l'événement est compliqué à chaque fois. Cette fois-ci, lors de la prise en charge de DynamoDB Stream en Python, je veux être capable d'écrire autre chose que le traitement principal de manière concise.
Flux DynamoDB et déclencheurs AWS Lambda - Amazon DynamoDB
Le contenu de cet article est enregistré dans PyPI en tant que bibliothèque.
Ce sera plus facile si vous utilisez le traitement ici.
--Je veux enregistrer la fonction correspondante (y compris lambda) et l'exécuter pour chaque enregistrement.
Donc, j'aimerais pouvoir écrire le point d'entrée comme ça.
handler(des plans)
def lambda_handler(event, context):
ds = DynamoStreamDispatcher(event)
ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
ds.on_remove.append(after_remove1)
ds.on_remove.append(after_remove2) #Plusieurs processus OK
ds.on_modify.append(after_modify)
ds.dispatch()
return True
C'est comme enregistrer un gestionnaire et distribuer.
Pour le moment, j'ai implémenté le contenu que je voulais écrire dans le lambda_handler qui avait été décidé au début.
lambda_function.py
from __future__ import print_function
import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()
print('Loading function')
class DeRecord:
##Article désérialisé
def __init__(self, rec):
self.event_name = rec['eventName']
self.old = self._desi(rec['dynamodb'].get('OldImage'))
self.new = self._desi(rec['dynamodb'].get('NewImage'))
def _desi(self, image):
d = {}
if image:
for key in image:
d[key] = deser.deserialize(image[key])
return d
class DynamoStreamDispatcher:
def __init__(self, event):
self.on_insert = []
self.on_remove = []
self.on_modify = []
self.records = []
for r in event['Records']:
#L'enregistrement est transformé en un dict.
self.records.append(DeRecord(r))
self.raw = event
def dispatch(self):
"""
synced dispatcher
"""
results = []
for r in self.records:
try:
for runner in getattr(self, 'on_' + r.event_name.lower()):
results.append(runner(r))
except AttributeError:
print("Unknown event " + r.event_name)
continue
return results
##À partir de là, un exemple de fonction de traitement pour Lambda individuel. L'argument est un enregistrement traité
def after_remove1(rec):
print("deleted")
return None
def after_remove2(rec):
print(rec.old)
return None
def after_modify(rec):
print("key updated...")
print(rec.old['Message'])
print(rec.new['Message'])
return None
def lambda_handler(event, context):
ds = DynamoStreamDispatcher(event)
ds.on_insert.append(lambda rec: print(rec.event_name))
ds.on_remove.append(after_remove1)
ds.on_remove.append(after_remove2)
ds.on_modify.append(after_modify)
ds.dispatch()
return True
Essayez d'exécuter Dynamo DB Update (* joint à la fin) à partir du modèle d'événement Exemple.
START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3
Chaque fonction enregistrée est exécutée et OK.
C'est presque ma propre bibliothèque, mais s'il n'y a rien de similaire à PyPI, je pense que je peux m'enregistrer et l'utiliser. Après cela, je veux stocker les différences et ainsi de suite.
Annexe: exemple d'événement de mise à jour DynamoDB
python
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
},
{
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"OldImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "222",
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 59,
"NewImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "MODIFY",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
},
{
"eventID": "3",
"eventVersion": "1.0",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"SizeBytes": 38,
"SequenceNumber": "333",
"OldImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"awsRegion": "us-west-2",
"eventName": "REMOVE",
"eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
"eventSource": "aws:dynamodb"
}
]
}
référence: