[PYTHON] J'ai essayé de créer un traitement par lots sans serveur pour la première fois avec DynamoDB et Step Functions

Lot sans serveur sur AWS

Entre deux comptes AWS (appairage de VPC terminé), traitez les données d'une base de données à l'autre pour créer un lot comme.

qiita1.png

En utilisant activement les services AWS et en étudiant, Tout d'abord, placez DynamoDB entre et divisez [Lambda](https://docs.aws.amazon. com / ja_jp / lambda / latest / dg / welcome.html) Il est traité par fonction.

qiita2.png

Les informations d'accès à RDS sont stockées à l'aide de Secrets Manager et la série de flux est [Step Functions]. J'ai essayé de le construire avec (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html). Planifiez ceci sur Événements ClooudWatch.

qiita3.png

Construisons chacun.

① Lambda [RDS to DynamoDB] L'accès à RDS avec Lambda est un anti-pattern en raison du problème du nombre de pools de connexions [^ 1], Étant donné que ce lot est un déclencheur de planification, le nombre d'exécutions simultanées ne devrait pas être un problème.

cf. Lambda + RDS est un anti-pattern

[^ 1]: Aperçu du proxy RDS annoncé Semble être résolu.

Placez-le dans le VPC pour l'accès RDS. Les stratégies IAM requises sont les suivantes.

Le runtime est ** Python 3.8 **. En plus de boto3, la bibliothèque utilise pymysql pour l'accès RDS (MySQL). (Pip installer localement et télécharger le zip)

Secrets Manager Il stocke des informations secrètes telles que RDS et DocumentDB, et semble gérer automatiquement la rotation des mots de passe en fonction des paramètres. Vous pouvez stocker n'importe quel élément en tant que secret, pas seulement votre identifiant et votre mot de passe. Exemple) Hôte, port, nom de la base de données, etc.

Au fait, lorsque j'essaye de définir avec "RDS" ici, je ne peux sélectionner que des instances RDS sous le compte correspondant. Cette fois, je voulais créer les informations secrètes du RDS de la destination Peering, j'ai donc décidé de les créer comme un secret arbitraire dans "Autre". (Pas de rotation)

Après la création, un exemple de code pour chaque langue sera généré. Vous pouvez simplement le copier et le coller. (Peut être confirmé plus tard)

get_secret_value_response['SecretString']Ou base64.b64decode(get_secret_value_response['SecretBinary'])On dit qu'il sera utilisé, donc je vais le retourner.



 L'appelant ressemble à ceci.


#### **`lambda.py`**
```python

    secrets = json.loads(get_secret())

    conn = pymysql.connect(
            secrets['host'],
            user=secrets['username'],
            passwd=secrets['password'],
            db=secrets['dbname'],
            cursorclass=pymysql.cursors.DictCursor
        )

DynamoDB Créez la table requise à l'avance. La clé primaire se compose d'une ** clé de partition ** (obligatoire) et d'une ** clé de tri ** (facultatif).

cf. J'ai résumé l'index clé de DynamoDB

Les enregistrements dans Relational DB semblent être appelés ** Item ** dans DynamoDB.

Entrez les données avec PutItem. Les types pris en charge sont différents des types MySQL, donc si vous essayez de saisir le type de date tel quel, une erreur se produira.

cf. Règles de dénomination et types de données # Types de données

De plus, comme les caractères vides ne peuvent pas être saisis, il est nécessaire de spécifier explicitement le type Null.

cf. L'histoire selon laquelle les données ne pouvaient pas être saisies dans DynamoDB si elles étaient laissées vides lors de l'utilisation de Boto3

cf. DynamoDB ne peut pas enregistrer une valeur vide, mais vous pouvez convertir une valeur vide en NULL en passant simplement une option au constructeur du client de document DynamoDB?

lambda.py


import boto3

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    for row in data:
        for k,v in row.items():
            if isinstance(v, str) and v == '':
                #Caractères vides dans DynamoDB''Spécifiez explicitement None car une erreur se produira si vous essayez d'entrer
                row[k] = None
            elif isinstance(v, (datetime.date, datetime.datetime, datetime.time)):
                #DynamoDB ne prend pas en charge le type date / heure, alors convertissez-le en chaîne de caractères
                row[k] = v.isoformat() #Bien sûr, strftime est bien

        #Écrase si la clé existe
        res = table.put_item(
            Item=row
        )

Il existe également un ʻUpdateItem` qui spécifie la colonne à mettre à jour, similaire à UPDATE dans RDB.

② Lambda [DynamoDB to RDS] Il est également placé dans le VPC et la stratégie IAM est la suivante.

DynamoDB Obtenez 1 élément avec GetItem de la spécification de clé primaire.

lambda.py


import boto3

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    #GetItem (avec clé de tri)
    res = table.get_item(
        Key={
            'partition-key-name': VALUE1,
            'sort-key-name': VALUE2
        }
    )

    res['Item'] #L'objet acquis. Si tu ne peux pas l'obtenir'Item'N'existe pas.

Il y a aussi «Requête» et «Scan». Pour comprendre comment l'utiliser correctement ...

Fondamentalement, il semble bon d'utiliser «Query».

cf. J'ai essayé DynamoDB en utilisant Python (boto3)

lambda.py


import boto3
from boto3.dynamodb.conditions import Key #j'ai besoin de ça

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    #Query:Spécification de clé (& avec clé de tri)
    res = table.query(
        KeyConditionExpression=Key('partition-key-name').eq(VALUE1) & Key('sort-key-name').eq(VALUE2)
    )

    #Query:Spécification d'index (& clé de partition uniquement)
    res = table.query(
        IndexName='index-name',
        KeyConditionExpression=Key('partition-key-name').eq(VALUE3)
    )

    res['Count'] #Nombre d'acquisitions
    res['Items'] #Liste des objets acquis

Lors de la création d'un index avec DynamoDB, il s'appelle ** projection ** et ressemble à une image dans laquelle une table de copie est créée. Selon la taille des données de la table, seule la clé doit être reflétée plutôt que toutes les colonnes. Obtenir la clé avec l'index → Récupérer à nouveau l'élément avec la clé Il vaut peut-être mieux faire le traitement.

③ Step Functions Un flux de processus peut être construit à l'aide de la notation JSON. (Il n'est pas limité à Lambda qui peut être combiné) Vous pouvez écrire un branchement conditionnel et un traitement parallèle, ce qui vous permet de créer un flux de manière assez flexible. Pour être honnête, il est difficile de lire et d'écrire s'il ne s'agit que d'une définition, mais c'est utile car cela vérifie la syntaxe et crée automatiquement un diagramme de flux. Il semble que le flux créé s'appelle ** state machine **.

cf. Système de traitement par lots sans serveur créé avec AWS Step Functions

Lors de l'appel de Lambda, «Invoke Function» est requis dans la stratégie IAM.

Les fonctions de ① et ② sont en fait un peu plus subdivisées, nous allons donc créer un flux ici. Le traitement série et le traitement parallèle sont à peu près les suivants.

Traitement série de la fonction Lambda


{
  "Comment": "Commentaire Commentaire Commentaire",
  "StartAt": "First Process",
  "States": {
    "First Process": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
      "Next": "Second Process"
    },
    "Second Process": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
      "End": true
    }
  }
}

Traitement parallèle des fonctions Lambda


{
  "Comment": "Commentaire Commentaire Commentaire",
  "StartAt": "Main Process",
  "States": {
    "Main Process": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "Branch Process A",
          "States": {
            "Branch Process A": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
              "End": true
            }
          }
        },
        {
          "StartAt": "Branch Process B",
          "States": {
            "Branch Process B": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
              "End": true
            }
          }
        }
      ]
    }
  }
}

CloudWatch Events Les machines d'état peuvent être déclenchées par CloudWatch Events. Lors de l'exécution d'un planning, il est spécifié par l'expression rate ou l'expression cron, J'étais tellement accro à «cron», donc juste un rappel.

--6 éléments de minute, heure, jour, mois, jour, année

cf. Schedule Expressions for Rules

en conclusion

Tous ces services étaient nouveaux pour moi, mais ils étaient très intéressants. Soyons sans serveur!

Et quand je l'essaye, je me demande quoi faire avec la gestion de code Lambda et l'environnement de développement ...?

Recommended Posts

J'ai essayé de créer un traitement par lots sans serveur pour la première fois avec DynamoDB et Step Functions
J'ai essayé tensorflow pour la première fois
J'ai essayé la programmation python pour la première fois.
J'ai essayé Mind Meld pour la première fois
J'ai essayé Python sur Mac pour la première fois.
AI Gaming Je l'ai essayé pour la première fois
Créez une commande pour rechercher des composés similaires dans la base de données cible avec RDKit et vérifiez le temps de traitement
Je souhaite créer une base de données de déjeuners [EP1] Django pour la première fois
Je souhaite créer une base de données de déjeuner [EP1-4] Django pour la première fois
J'ai essayé de comparer la vitesse de traitement avec dplyr de R et pandas de Python
La première étape de la création d'une application sans serveur avec Zappa
J'ai essayé d'illustrer le temps et le temps du langage C
J'ai essayé d'afficher l'heure et la météo d'aujourd'hui w
J'ai essayé l'API Google Cloud Vision pour la première fois
Je veux créer un Dockerfile pour le moment.
J'ai essayé de publier automatiquement sur ChatWork au moment du déploiement avec Fabric et ChatWork Api
J'ai essayé de décrire le trafic en temps réel avec WebSocket
J'ai essayé de créer des taureaux et des vaches avec un programme shell
Pour le moment, je veux convertir n'importe quel fichier avec ffmpeg !!
Je souhaite séparer le traitement entre le temps de test et l'environnement de production
J'ai essayé d'apprendre l'angle du péché et du cos avec le chainer
J'ai essayé de créer le téléchargement CSV, le traitement des données, la fonction de téléchargement avec Django
Impressions et mémorandum lors de la première utilisation de VScode
Pour la première fois dans Numpy, je vais le mettre à jour de temps en temps
J'ai essayé de contrôler la bande passante et le délai du réseau avec la commande tc
La première étape pour se débarrasser des requêtes lentes! J'ai essayé d'avertir Chatwork des requêtes lentes pour RDS pour MySQL à l'aide de Lambda et de l'AWS CLI v2
J'ai joué avec Floydhub pour le moment
Essayez de publier sur Qiita pour la première fois
J'ai essayé d'extraire des expressions uniques avec la bibliothèque de traitement du langage naturel GiNZA
J'ai essayé de traiter et de transformer l'image et d'élargir les données pour l'apprentissage automatique
J'ai essayé de créer un bouton pour Slack avec Raspeye + Tact Switch
[Introduction à AWS] J'ai essayé de porter une application de conversation et de jouer avec text2speech @ AWS ♪
GTUG Girls + PyLadiesTokyo Meetup Je suis allé au premier machine learning
J'ai essayé de créer un modèle avec l'exemple d'Amazon SageMaker Autopilot
L'histoire qui n'avait rien à voir avec la partition lorsque j'ai fait une sauvegarde de disque avec dd pour la première fois
J'ai essayé d'entraîner la fonction péché avec chainer
J'ai essayé de classer M. Hanyu et M. Hanyu avec le traitement du langage naturel × classificateur Naive Bayes
Ce que je suis entré dans Python pour la première fois
J'ai essayé de lire et d'enregistrer automatiquement avec VOICEROID2 2
J'ai essayé d'implémenter et d'apprendre DCGAN avec PyTorch
La première étape de l'apprentissage automatique ~ Pour ceux qui veulent essayer l'implémentation avec python ~
J'ai essayé de résoudre Soma Cube avec python
J'ai essayé d'automatiser la mise à jour de l'article du blog Livedoor avec Python et sélénium.
Pour la première fois, j'ai découvert Unix (Linux).
J'ai essayé de lire et d'enregistrer automatiquement avec VOICEROID2
J'ai essayé de créer un environnement d'apprentissage amélioré pour Othello avec Open AI gym
sphinx-quickstart est devenu un problème et j'ai essayé de créer une commande alternative et le stress a disparu
J'ai essayé de résoudre le problème avec Python Vol.1
Je voulais juste extraire les données de la date et de l'heure souhaitées avec Django
J'ai essayé d'implémenter Grad-CAM avec keras et tensorflow
J'ai essayé de passer par l'optimisation bayésienne. (Avec des exemples)
J'ai essayé la même analyse de données avec kaggle notebook (python) et PowerBI en même temps ②
J'ai essayé de résoudre le problème de F02 comment écrire en temps réel hors ligne avec Python