Berücksichtigen Sie bei der Verarbeitung von DynamoDB Stream mit Lambda (Python) die allgemeine Vorverarbeitung.

Es ist praktisch, Lambda durch Verknüpfen von AWS-Ressourcen zu verwenden, aber das Ereignis ist jedes Mal kompliziert. Dieses Mal möchte ich beim Aufnehmen von DynamoDB Stream in Python in der Lage sein, etwas anderes als die Hauptverarbeitung präzise zu schreiben.

DynamoDB-Streams und AWS Lambda-Trigger - Amazon DynamoDB

Der Inhalt dieses Artikels ist in PyPI als Bibliothek registriert.

Gemeinsame Verarbeitungsspezifikationen

Es wird einfacher, wenn Sie die Verarbeitung hier verwenden.

Ich wünschte, ich könnte den Einstiegspunkt so schreiben.

handler(Pläne)


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2) #Mehrere Prozesse OK
    ds.on_modify.append(after_modify)


    ds.dispatch()
    return True

Es fühlt sich an, als würde man einen Handler registrieren und versenden.

Ich werde es grob machen

Vorerst habe ich den Inhalt, den ich schreiben wollte, in den zu Beginn beschlossenen lambda_handler implementiert.

lambda_function.py


from __future__ import print_function

import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()

print('Loading function')


class DeRecord:
    ##Deserialisierter Gegenstand
    def __init__(self, rec):
        self.event_name = rec['eventName']
        self.old = self._desi(rec['dynamodb'].get('OldImage'))
        self.new = self._desi(rec['dynamodb'].get('NewImage'))

    def _desi(self, image):
        d = {}
        if image:
            for key in image:
                d[key] = deser.deserialize(image[key])
        return d

        
class DynamoStreamDispatcher:
    def __init__(self, event):
        self.on_insert = []
        self.on_remove = []
        self.on_modify = []
        self.records   = []
        for r in event['Records']:
            #Der Datensatz wird zu einem Diktat verarbeitet.
            self.records.append(DeRecord(r))

        self.raw = event

    def dispatch(self):
        """
        synced dispatcher
        """
        results = []
        for r in self.records:
            try:
                for runner in getattr(self, 'on_' + r.event_name.lower()):
                    results.append(runner(r))
            except AttributeError:
                print("Unknown event " + r.event_name)
                continue
                    
        return results


##Von hier aus ein Funktionsbeispiel der Verarbeitung für einzelne Lambda. Das Argument ist ein verarbeiteter Datensatz
def after_remove1(rec):
    print("deleted")
    return None

def after_remove2(rec):
    print(rec.old)
    return None


def after_modify(rec):
    print("key updated...")
    print(rec.old['Message'])
    print(rec.new['Message'])
    return None


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name))
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2)
    ds.on_modify.append(after_modify)

    ds.dispatch()
    return True

Versuchen Sie, Dynamo DB Update (* am Ende angehängt) über die Beispielereignisvorlage auszuführen.

START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3

Jede registrierte Funktion wird ausgeführt und OK.

Es ist fast meine eigene Bibliothek, aber wenn es nichts Ähnliches wie PyPI gibt, kann ich es registrieren und verwenden. Danach möchte ich die Unterschiede speichern und so weiter.


Anhang: Beispielereignis für DynamoDB Update

python


{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "3",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 38,
        "SequenceNumber": "333",
        "OldImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "REMOVE",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    }
  ]
}

Referenz:

Recommended Posts

Berücksichtigen Sie bei der Verarbeitung von DynamoDB Stream mit Lambda (Python) die allgemeine Vorverarbeitung.
Bearbeiten von DynamoDB-Daten mit Lambda (Node & Python)
Verwenden Sie DynamoDB mit Python
Bildverarbeitung mit Python
Schreiben Sie mit Lambda (Python, JavaScript) mehrere Datensätze in DynamoDB.
100 Sprachverarbeitungsklopfen mit Python 2015
Betreiben Sie TwitterBot mit Lambda, Python
"Apple-Verarbeitung" mit OpenCV3 + Python3
Akustische Signalverarbeitung mit Python (2)
Bildverarbeitung mit Python (Teil 1)
Bildverarbeitung mit Python (3)
Fehler beim Spielen mit Python
[Python] Bildverarbeitung mit Scicit-Image
[Python] Einfache Parallelverarbeitung mit Joblib
100 Sprachverarbeitungsklopfen mit Python (Kapitel 1)
Gesichtserkennung mit Lambda (Python) + Erkennung
[Verarbeitung natürlicher Sprache] Vorverarbeitung mit Japanisch
100 Sprachverarbeitungsklopfen mit Python (Kapitel 3)
Die Bildverarbeitung mit Python 100 klopft an die Binärisierung Nr. 3
Python String Processing Map und Lambda
Benachrichtigen Sie HipChat mit AWS Lambda (Python)
100 Bildverarbeitung mit Python Knock # 2 Graustufen
Wenn matplotlib nicht mit python2.7 funktioniert
Verwenden Sie PostgreSQL mit Lambda (Python + psycopg2)
Bei Verwendung von MeCab mit virtualenv python
Vorsichtsmaßnahmen bei Verwendung von sechs mit Python 2.5
[Python] Format, wenn to_csv mit Pandas
ImportError beim Versuch, das gcloud-Paket mit der AWS Lambda Python-Version zu verwenden
Grundlagen der binärisierten Bildverarbeitung durch Python
Bildverarbeitung mit Python 100 Knock # 10 Medianfilter
[AWS] Verwenden von INI-Dateien mit Lambda [Python]
Snippet für die Vollbit-Suche mit Python
Periodische Ausführungsverarbeitung bei Verwendung von tkinter [Python3]
100 Bildverarbeitung mit Python Knock # 8 Max Pooling
Hinweise beim Erstellen einer Umgebung mit Python
Führen Sie regelmäßig eine beliebige Verarbeitung mit Python Twisted durch
Zu beachtende Punkte bei der Lösung von DP-Problemen mit Python
Lassen Sie Heroku die Hintergrundverarbeitung mit Python durchführen
100 Sprachverarbeitungsklopfen mit Python (Kapitel 2, Teil 2)
Bildverarbeitung mit Python & OpenCV [Tonkurve]
3. Verarbeitung natürlicher Sprache durch Python 2-1. Netzwerk für das gleichzeitige Auftreten
Bildverarbeitung mit Python 100 Knock # 12 Bewegungsfilter
3. Verarbeitung natürlicher Sprache durch Python 1-1. Word N-Gramm
Allgemeine Verarbeitung bei der Anforderung einer klassenbasierten Ansicht
100 Sprachverarbeitungsklopfen mit Python (Kapitel 2, Teil 1)
Zeichnen mit Matrix-Reinventor von Python Image Processing-
Verarbeiten Sie Bilder in Python ganz einfach mit Pillow
Stellen Sie mit AWS Lambda Python eine Verbindung zu s3 her
Die Bildverarbeitung mit Python 100 führt zu einem durchschnittlichen Pooling von # 7
Streame Redmine-Updates mit Python auf Hipchat
Leichte Bildverarbeitung mit Python x OpenCV
Bildverarbeitung mit Lambda + OpenCV (graue Bilderzeugung)
Einfache REST-API mit API Gateway / Lambda / DynamoDB
Versuchen Sie, Python: Lambda zuzuweisen oder zu wechseln
Bildverarbeitung mit Python 100 Knock # 9 Gauß-Filter
Versuchen Sie die Bildverarbeitung mit Python, wenn Sie bei einer Hochzeit um Unterhaltung gebeten werden
Was ich getan habe, als ich mit Lambda Python im Zeitlimit steckte