Tenez compte du prétraitement courant lors du traitement du flux DynamoDB avec Lambda (Python)

Il est pratique d'utiliser Lambda en liant les ressources AWS, mais l'événement est compliqué à chaque fois. Cette fois-ci, lors de la prise en charge de DynamoDB Stream en Python, je veux être capable d'écrire autre chose que le traitement principal de manière concise.

Flux DynamoDB et déclencheurs AWS Lambda - Amazon DynamoDB

Le contenu de cet article est enregistré dans PyPI en tant que bibliothèque.

Spécifications de traitement communes

Ce sera plus facile si vous utilisez le traitement ici.

--Je veux enregistrer la fonction correspondante (y compris lambda) et l'exécuter pour chaque enregistrement.

Donc, j'aimerais pouvoir écrire le point d'entrée comme ça.

handler(des plans)


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2) #Plusieurs processus OK
    ds.on_modify.append(after_modify)


    ds.dispatch()
    return True

C'est comme enregistrer un gestionnaire et distribuer.

Je vais le faire à peu près

Pour le moment, j'ai implémenté le contenu que je voulais écrire dans le lambda_handler qui avait été décidé au début.

lambda_function.py


from __future__ import print_function

import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()

print('Loading function')


class DeRecord:
    ##Article désérialisé
    def __init__(self, rec):
        self.event_name = rec['eventName']
        self.old = self._desi(rec['dynamodb'].get('OldImage'))
        self.new = self._desi(rec['dynamodb'].get('NewImage'))

    def _desi(self, image):
        d = {}
        if image:
            for key in image:
                d[key] = deser.deserialize(image[key])
        return d

        
class DynamoStreamDispatcher:
    def __init__(self, event):
        self.on_insert = []
        self.on_remove = []
        self.on_modify = []
        self.records   = []
        for r in event['Records']:
            #L'enregistrement est transformé en un dict.
            self.records.append(DeRecord(r))

        self.raw = event

    def dispatch(self):
        """
        synced dispatcher
        """
        results = []
        for r in self.records:
            try:
                for runner in getattr(self, 'on_' + r.event_name.lower()):
                    results.append(runner(r))
            except AttributeError:
                print("Unknown event " + r.event_name)
                continue
                    
        return results


##À partir de là, un exemple de fonction de traitement pour Lambda individuel. L'argument est un enregistrement traité
def after_remove1(rec):
    print("deleted")
    return None

def after_remove2(rec):
    print(rec.old)
    return None


def after_modify(rec):
    print("key updated...")
    print(rec.old['Message'])
    print(rec.new['Message'])
    return None


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name))
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2)
    ds.on_modify.append(after_modify)

    ds.dispatch()
    return True

Essayez d'exécuter Dynamo DB Update (* joint à la fin) à partir du modèle d'événement Exemple.

START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3

Chaque fonction enregistrée est exécutée et OK.

C'est presque ma propre bibliothèque, mais s'il n'y a rien de similaire à PyPI, je pense que je peux m'enregistrer et l'utiliser. Après cela, je veux stocker les différences et ainsi de suite.


Annexe: exemple d'événement de mise à jour DynamoDB

python


{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "3",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 38,
        "SequenceNumber": "333",
        "OldImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "REMOVE",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    }
  ]
}

référence:

Recommended Posts

Tenez compte du prétraitement courant lors du traitement du flux DynamoDB avec Lambda (Python)
Manipulation des données DynamoDB avec Lambda (Node et Python)
Utiliser DynamoDB avec Python
Traitement d'image avec Python
Écrire plusieurs enregistrements dans DynamoDB avec Lambda (Python, JavaScript)
100 coups de traitement du langage avec Python 2015
Exploitez TwitterBot avec Lambda, Python
"Traitement Apple" avec OpenCV3 + Python3
Traitement du signal acoustique avec Python (2)
Traitement du signal acoustique avec Python
Traitement d'image avec Python (partie 1)
Traitement d'image avec Python (3)
Erreur lors de la lecture avec python
[Python] Traitement d'image avec scicit-image
[Python] Traitement parallèle facile avec Joblib
100 traitements de langage avec Python
Détection de visage avec Lambda (Python) + Rekognition
[Traitement du langage naturel] Prétraitement avec le japonais
100 traitements de langage avec Python (chapitre 3)
Traitement d'image avec la binarisation Python 100 knocks # 3
mappe de traitement de chaîne python et lambda
Notifier HipChat avec AWS Lambda (Python)
100 traitement d'image par Python Knock # 2 Échelle de gris
Quand matplotlib ne fonctionne pas avec python2.7
Utiliser PostgreSQL avec Lambda (Python + psycopg2)
Lors de l'utilisation de MeCab avec python dans virtualenv
Précautions lors de l'utilisation de six avec Python 2.5
[Python] Formater quand to_csv avec des pandas
ImportError lors de la tentative d'utilisation du package gcloud avec la version AWS Lambda Python
Bases du traitement d'images binarisées par Python
Traitement d'image par Python 100 knock # 10 filtre médian
[AWS] Utilisation de fichiers ini avec Lambda [Python]
Extrait de code pour une recherche de bits complète avec python
Traitement d'exécution périodique lors de l'utilisation de tkinter [Python3]
100 traitement d'image avec Python Knock # 8 Max Pooling
Remarques lors de la création d'un environnement avec python
Effectuez périodiquement un traitement arbitraire avec Python Twisted
Points à noter lors de la résolution de problèmes DP avec Python
Laissez Heroku faire le traitement en arrière-plan avec Python
100 traitements de langage avec Python (chapitre 2, partie 2)
Traitement d'image avec Python et OpenCV [Tone Curve]
3. Traitement du langage naturel par Python 2-1. Réseau de co-occurrence
Traitement d'image par Python 100 knock # 12 motion filter
3. Traitement du langage naturel par Python 1-1. Word N-gram
Traitement courant lors de la demande de vue basée sur les classes
100 traitements de langage avec Python (chapitre 2, partie 1)
Dessin avec Matrix-Reinventor of Python Image Processing-
Traitez facilement des images en Python avec Pillow
Connectez-vous à s3 avec AWS Lambda Python
Traitement d'image avec Python 100 knocks # 7 pooling moyen
Diffusez les mises à jour de Redmine vers Hipchat avec Python
Traitement d'image léger avec Python x OpenCV
Traitement d'image avec Lambda + OpenCV (création d'image grise)
API REST facile avec API Gateway / Lambda / DynamoDB
Essayez d'attribuer ou de changer avec Python: lambda
Traitement d'image par Python 100 knock # 9 Filtre Gaussien
Essayez le traitement d'image avec Python lorsque vous êtes invité à vous divertir lors d'un mariage
Ce que j'ai fait quand je suis resté coincé dans le délai avec lambda python