Consider common pre-processing when processing DynamoDB Stream with Lambda (Python)

It is convenient to use Lambda by linking AWS resources, but the event is complicated every time. This time, when picking up DynamoDB Stream in Python, I want to be able to write something other than the main processing concisely.

DynamoDB Streams and AWS Lambda Triggers-Amazon DynamoDB

The content of this article is registered in PyPI as a library.

Common processing specifications

It will be easier if you use the processing around here.

--I want to register the corresponding function (including lambda) and execute it for each record. --Since the called party can decide the context, there is no need for branching. --Easy to retrieve Item data ――By the way, it comes with the familiar type of Dynamo, so convert it to Python Dict. -(Option) Exception handling

So, I wish I could write the entry point like this.

handler(plans)


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2) #Multiple processes OK
    ds.on_modify.append(after_modify)


    ds.dispatch()
    return True

It feels like registering a handler and dispatching.

I will make it roughly

For the time being, I implemented the content I wanted to write in the lambda_handler that I decided at the beginning.

lambda_function.py


from __future__ import print_function

import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()

print('Loading function')


class DeRecord:
    ##Deserialized Item
    def __init__(self, rec):
        self.event_name = rec['eventName']
        self.old = self._desi(rec['dynamodb'].get('OldImage'))
        self.new = self._desi(rec['dynamodb'].get('NewImage'))

    def _desi(self, image):
        d = {}
        if image:
            for key in image:
                d[key] = deser.deserialize(image[key])
        return d

        
class DynamoStreamDispatcher:
    def __init__(self, event):
        self.on_insert = []
        self.on_remove = []
        self.on_modify = []
        self.records   = []
        for r in event['Records']:
            #The record is processed into a dict.
            self.records.append(DeRecord(r))

        self.raw = event

    def dispatch(self):
        """
        synced dispatcher
        """
        results = []
        for r in self.records:
            try:
                for runner in getattr(self, 'on_' + r.event_name.lower()):
                    results.append(runner(r))
            except AttributeError:
                print("Unknown event " + r.event_name)
                continue
                    
        return results


##From here, a function sample of processing for individual Lambda. The argument is a processed record
def after_remove1(rec):
    print("deleted")
    return None

def after_remove2(rec):
    print(rec.old)
    return None


def after_modify(rec):
    print("key updated...")
    print(rec.old['Message'])
    print(rec.new['Message'])
    return None


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name))
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2)
    ds.on_modify.append(after_modify)

    ds.dispatch()
    return True

Try running DynamoDB Update (* attached at the end) from the Sample event template.

START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3

Each registered function is executed and OK.

It's almost my own library, but if there is nothing similar to PyPI, I think I can register and use it. After that, I want to store the differences and so on.


Appendix: DynamoDB Update Sample Events

python


{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "3",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 38,
        "SequenceNumber": "333",
        "OldImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "REMOVE",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    }
  ]
}

reference:

Recommended Posts

Consider common pre-processing when processing DynamoDB Stream with Lambda (Python)
Manipulate DynamoDB data with Lambda (Node & Python)
Use DynamoDB with Python
Image processing with Python
Write multiple records to DynamoDB with Lambda (Python, JavaScript)
100 Language Processing with Python Knock 2015
Operate TwitterBot with Lambda, Python
"Apple processing" with OpenCV3 + Python3
Acoustic signal processing with Python (2)
Acoustic signal processing with Python
Image processing with Python (Part 1)
Image processing with Python (Part 3)
Error when playing with python
[Python] Image processing with scikit-image
[Python] Easy parallel processing with Joblib
100 Language Processing Knock with Python (Chapter 1)
Face detection with Lambda (Python) + Rekognition
[Natural language processing] Preprocessing with Japanese
100 Language Processing Knock with Python (Chapter 3)
Image processing with Python 100 knocks # 3 Binarization
python string processing map and lambda
Notify HipChat with AWS Lambda (Python)
Image processing with Python 100 knocks # 2 Grayscale
When matplotlib doesn't work with python2.7
Use PostgreSQL with Lambda (Python + psycopg2)
When using MeCab with virtualenv python
Precautions when using six with Python 2.5
[Python] Format when to_csv with pandas
ImportError when trying to use gcloud package with AWS Lambda Python version
Basics of binarized image processing with Python
Image processing with Python 100 knock # 10 median filter
[AWS] Using ini files with Lambda [Python]
Snippet when searching all bits with python
Periodic execution processing when using tkinter [Python3]
Image processing with Python 100 knocks # 8 Max pooling
Note when creating an environment with python
Periodically perform arbitrary processing with Python Twisted
Precautions when solving DP problems with Python
Let Heroku do background processing with Python
100 Language Processing Knock with Python (Chapter 2, Part 2)
Image processing with Python & OpenCV [Tone Curve]
3. Natural language processing with Python 2-1. Co-occurrence network
Image processing with Python 100 knock # 12 motion filter
3. Natural language processing with Python 1-1. Word N-gram
Common processing when requesting to Class-based View
100 Language Processing Knock with Python (Chapter 2, Part 1)
Drawing with Matrix-Reinventor of Python Image Processing-
Easy image processing in Python with Pillow
Connect to s3 with AWS Lambda Python
Image processing with Python 100 knocks # 7 Average pooling
Stream redmine updates to hipchat with python
Light image processing with Python x OpenCV
Image processing with Lambda + OpenCV (gray image creation)
Easy REST API with API Gateway / Lambda / DynamoDB
Try assigning or switching with Python: lambda
Image processing with Python 100 knocks # 9 Gaussian filter
Python + Selenium + Headless Chromium with aws lambda
Try image processing with Python when asked for entertainment at a wedding ceremony
What I did when I got stuck in the time limit with lambda python