[PYTHON] Ich habe zum ersten Mal versucht, mit DynamoDB und Step Functions eine serverlose Stapelverarbeitung zu erstellen

Serverloser Stapel unter AWS

Verarbeiten Sie zwischen zwei AWS-Konten (VPC-Peering abgeschlossen) Daten von einer Datenbank zur anderen, um einen Stapel wie zu erstellen.

qiita1.png

Indem Sie AWS-Services aktiv nutzen und studieren, Legen Sie zunächst DynamoDB dazwischen und teilen Sie Lambda auf. com / ja_jp / lambda / latest / dg / welcome.html) Es wird von der Funktion verarbeitet.

qiita2.png

Zugriffsinformationen auf RDS werden mit Secrets Manager gespeichert, und der Ablauf ist [Schrittfunktionen]. Ich habe versucht, es mit (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html) zu erstellen. Planen Sie dies unter ClooudWatch Events.

qiita3.png

Lassen Sie uns jeden einzelnen bauen.

① Lambda [RDS to DynamoDB] Der Zugriff auf RDS mit Lambda ist aufgrund des Problems der Anzahl der Verbindungspools ein Anti-Pattern [^ 1]. Da dieser Stapel ein Zeitplanauslöser ist, sollte die Anzahl der gleichzeitigen Ausführungen kein Problem darstellen.

vgl. Lambda + RDS ist ein Anti-Pattern

[^ 1]: RDS-Proxy hat Vorschau angekündigt Scheint gelöst zu sein.

Legen Sie es in die VPC für den RDS-Zugriff. Die erforderlichen IAM-Richtlinien lauten wie folgt.

Die Laufzeit ist ** Python 3.8 **. Zusätzlich zu "boto3" verwendet die Bibliothek "pymysql" für den RDS (MySQL) -Zugriff. (Pip lokal installieren und zip hochladen)

Secrets Manager Es speichert geheime Informationen wie RDS und DocumentDB und scheint die Kennwortrotation abhängig von den Einstellungen automatisch zu verwalten. Sie können jeden Gegenstand als Geheimnis speichern, nicht nur Ihre ID und Ihr Passwort. Beispiel) Host, Port, Datenbankname usw.

Wenn ich hier versuche, mit "RDS" zu setzen, kann ich übrigens nur RDS-Instanzen unter dem entsprechenden Konto auswählen. Dieses Mal wollte ich die geheimen Informationen des RDS des Peering-Ziels erstellen, deshalb habe ich beschlossen, sie als willkürliches Geheimnis in "Andere" zu erstellen. (Keine Drehung)

Nach der Erstellung wird ein Beispielcode für jede Sprache generiert. Sie können es einfach kopieren und einfügen. (Kann später bestätigt werden)

get_secret_value_response['SecretString']Oder base64.b64decode(get_secret_value_response['SecretBinary'])Es wird gesagt, dass es verwendet wird, also werde ich es zurückgeben.



 Der Anrufer sieht so aus.


#### **`lambda.py`**
```python

    secrets = json.loads(get_secret())

    conn = pymysql.connect(
            secrets['host'],
            user=secrets['username'],
            passwd=secrets['password'],
            db=secrets['dbname'],
            cursorclass=pymysql.cursors.DictCursor
        )

DynamoDB Erstellen Sie die gewünschte Tabelle im Voraus. Der Primärschlüssel besteht aus einem ** Partitionsschlüssel ** (erforderlich) und einem ** Sortierschlüssel ** (optional).

vgl. Ich habe den Schlüsselindex von DynamoDB zusammengefasst

Datensätze in der relationalen Datenbank scheinen in DynamoDB ** Item ** zu heißen.

Eingabedaten mit PutItem. Die unterstützten Typen unterscheiden sich von den MySQL-Typen. Wenn Sie also versuchen, den Datumstyp unverändert einzugeben, tritt ein Fehler auf.

Siehe Namensregeln und Datentypen #Datentypen

Da keine leeren Zeichen eingegeben werden können, muss der Null-Typ explizit angegeben werden.

vgl. Die Geschichte, dass Daten nicht in DynamoDB eingegeben werden konnten, wenn sie bei Verwendung von Boto3 leer gelassen wurden

vgl. DynamoDB kann keinen leeren Wert registrieren, aber können Sie einen leeren Wert in einen NULL-Typ konvertieren, indem Sie eine Option an den Konstruktor des DynamoDB-Dokumentclients übergeben?

lambda.py


import boto3

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    for row in data:
        for k,v in row.items():
            if isinstance(v, str) and v == '':
                #Leere Zeichen in DynamoDB''Geben Sie explizit None an, da bei der Eingabe ein Fehler auftritt
                row[k] = None
            elif isinstance(v, (datetime.date, datetime.datetime, datetime.time)):
                #DynamoDB unterstützt keinen Datums- / Uhrzeittyp. Konvertieren Sie ihn daher in eine Zeichenfolge
                row[k] = v.isoformat() #Natürlich ist Strftime in Ordnung

        #Überschreibt, wenn Schlüssel vorhanden ist
        res = table.put_item(
            Item=row
        )

Es gibt auch ein UpdateItem, das die zu aktualisierende Spalte angibt, ähnlich wie UPDATE in RDB.

② Lambda [DynamoDB to RDS] Dies wird auch in die VPC gestellt, und die IAM-Richtlinie lautet wie folgt.

DynamoDB Holen Sie sich ein Element mit "GetItem" der Primärschlüsselspezifikation.

lambda.py


import boto3

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    #GetItem (mit Sortierschlüssel)
    res = table.get_item(
        Key={
            'partition-key-name': VALUE1,
            'sort-key-name': VALUE2
        }
    )

    res['Item'] #Der erworbene Gegenstand. Wenn du es nicht bekommen kannst'Item'Ist nicht vorhanden.

Es gibt auch "Abfrage" und "Scan". Als grobes Verständnis, wie man es richtig benutzt ...

Grundsätzlich scheint es gut zu sein, "Query" zu verwenden.

vgl. Ich habe DynamoDB mit Python (boto3) ausprobiert

lambda.py


import boto3
from boto3.dynamodb.conditions import Key #ich brauche das

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    #Query:Schlüsselspezifikation (& mit Sortierschlüssel)
    res = table.query(
        KeyConditionExpression=Key('partition-key-name').eq(VALUE1) & Key('sort-key-name').eq(VALUE2)
    )

    #Query:Indexspezifikation (nur Partitionsschlüssel)
    res = table.query(
        IndexName='index-name',
        KeyConditionExpression=Key('partition-key-name').eq(VALUE3)
    )

    res['Count'] #Anzahl der Akquisitionen
    res['Items'] #Liste der erworbenen Gegenstände

Wenn Sie einen Index mit DynamoDB erstellen, wird er als ** Projektion ** bezeichnet und scheint ein Bild zu sein, in dem eine Kopiertabelle erstellt wird. Abhängig von der Datengröße der Tabelle sollte nur der Schlüssel und nicht alle Spalten wiedergegeben werden. Schlüssel mit Index abrufen → Element mit Schlüssel erneut abrufen Es kann besser sein, die Verarbeitung durchzuführen.

③ Step Functions Ein Prozessablauf kann mithilfe der JSON-Notation erstellt werden. (Es ist nicht auf Lambda beschränkt, das kombiniert werden kann) Sie können bedingte Verzweigungen und Parallelverarbeitung schreiben, sodass Sie einen Fluss ganz flexibel erstellen können. Um ehrlich zu sein, ist es schwierig zu lesen und zu schreiben, wenn es sich nur um eine Definition handelt, aber es ist hilfreich, da es die Syntax überprüft und automatisch ein Flussdiagramm erstellt. Es scheint, dass der erstellte Fluss ** Zustandsmaschine ** heißt.

Vgl. Mit AWS-Schrittfunktionen erstelltes Serverless-Batch-System

Beim Aufruf von Lambda ist in der IAM-Richtlinie "InvokeFunction" erforderlich.

Die Funktionen von ① und ② sind tatsächlich etwas mehr unterteilt, daher werden wir hier einen Fluss erzeugen. Die serielle Verarbeitung und die parallele Verarbeitung sind ungefähr wie folgt.

Serielle Verarbeitung der Lambda-Funktion


{
  "Comment": "Kommentar Kommentar Kommentar",
  "StartAt": "First Process",
  "States": {
    "First Process": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
      "Next": "Second Process"
    },
    "Second Process": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
      "End": true
    }
  }
}

Parallele Verarbeitung von Lambda-Funktionen


{
  "Comment": "Kommentar Kommentar Kommentar",
  "StartAt": "Main Process",
  "States": {
    "Main Process": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "Branch Process A",
          "States": {
            "Branch Process A": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
              "End": true
            }
          }
        },
        {
          "StartAt": "Branch Process B",
          "States": {
            "Branch Process B": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
              "End": true
            }
          }
        }
      ]
    }
  }
}

CloudWatch Events Zustandsautomaten können durch CloudWatch-Ereignisse ausgelöst werden. Wenn ein Zeitplan ausgeführt wird, wird er durch den Ausdruck "rate" oder den Ausdruck "cron" angegeben. Ich war so süchtig nach "cron", also nur eine Erinnerung.

cf. Schedule Expressions for Rules

abschließend

Alle diese Dienstleistungen waren für mich neu, aber sie waren sehr interessant. Lass uns ohne Server sein!

Und wenn ich es versuche, frage ich mich, was ich mit der Lambda-Codeverwaltungs- und Entwicklungsumgebung machen soll ...?

Recommended Posts

Ich habe zum ersten Mal versucht, mit DynamoDB und Step Functions eine serverlose Stapelverarbeitung zu erstellen
Ich habe zum ersten Mal Tensorflow ausprobiert
Ich habe zum ersten Mal versucht, Python zu programmieren.
Ich habe Mind Meld zum ersten Mal ausprobiert
Ich habe Python zum ersten Mal auf dem Mac ausprobiert.
AI Gaming Ich habe es zum ersten Mal versucht
Erstellen Sie mit RDKit einen Befehl zum Suchen nach ähnlichen Verbindungen aus der Zieldatenbank und überprüfen Sie die Verarbeitungszeit
Ich möchte zum ersten Mal eine Django-Studie zur Mittagsdatenbank [EP1] erstellen
Ich möchte zum ersten Mal eine Django-Studie zum Mittagessen [EP1-4] erstellen
Ich habe versucht, die Verarbeitungsgeschwindigkeit mit dplyr von R und pandas von Python zu vergleichen
Der erste Schritt beim Erstellen einer serverlosen Anwendung mit Zappa
Ich habe versucht, die Zeit und die Zeit der C-Sprache zu veranschaulichen
Ich habe versucht, die Uhrzeit und das heutige Wetter anzuzeigen
Ich habe die Google Cloud Vision-API zum ersten Mal ausprobiert
Ich möchte vorerst eine Docker-Datei erstellen.
Ich habe versucht, zum Zeitpunkt der Bereitstellung mit Fabric und ChatWork Api automatisch in ChatWork zu posten
Ich habe versucht, den Datenverkehr mit WebSocket in Echtzeit zu beschreiben
Ich habe versucht, Bulls and Cows mit einem Shell-Programm zu erstellen
Vorerst möchte ich jede Datei mit ffmpeg konvertieren !!
Ich möchte die Verarbeitung zwischen Testzeit und Produktionsumgebung trennen
Ich habe versucht, den Winkel von Sin und Cos mit Chainer zu lernen
Ich habe versucht, mit Django eine CSV-Upload-, Datenverarbeitungs- und Download-Funktion zu erstellen
Impressionen und Memorandum bei der ersten Arbeit mit VScode
Zum ersten Mal in Numpy werde ich es von Zeit zu Zeit aktualisieren
Ich habe versucht, die Netzwerkbandbreite und -verzögerung mit dem Befehl tc zu steuern
Der erste Schritt, um langsame Abfragen loszuwerden! Ich habe versucht, Chatwork mit Lambda und AWS CLI v2 über langsame Abfragen für RDS for MySQL zu informieren
Ich habe vorerst mit Floydhub gespielt
Versuchen Sie zum ersten Mal, in Qiita zu posten
Ich habe versucht, mit der Bibliothek GiNZA zur Verarbeitung natürlicher Sprache eindeutige Ausdrücke zu extrahieren
Ich habe versucht, das Bild zu verarbeiten und zu transformieren und die Daten für maschinelles Lernen zu erweitern
Ich habe versucht, mit Raspeye + Tact Switch eine Schaltfläche für Slack zu erstellen
[Einführung in AWS] Ich habe versucht, eine Konversations-App zu portieren und mit text2speech @ AWS playing zu spielen
GTUG Girls + PyLadiesTokyo Meetup Ich ging zum ersten maschinellen Lernen
Ich habe versucht, ein Modell mit dem Beispiel von Amazon SageMaker Autopilot zu erstellen
Die Geschichte, die nichts mit der Partition zu tun hatte, als ich zum ersten Mal eine Festplatten-Sicherung mit dd durchführte
Ich habe versucht, die Sündenfunktion mit Chainer zu trainieren
Ich habe versucht, Mr. Hanyu und Mr. Hanyu mit dem Verarbeiter der Verarbeitung natürlicher Sprache × Naive Bayes zu klassifizieren
Was ich zum ersten Mal in Python bekommen habe
Ich habe versucht, mit VOICEROID2 2 automatisch zu lesen und zu speichern
Ich habe versucht, DCGAN mit PyTorch zu implementieren und zu lernen
Der erste Schritt des maschinellen Lernens ~ Für diejenigen, die versuchen möchten, mit Python zu implementieren ~
Ich habe versucht, Soma Cube mit Python zu lösen
Ich habe versucht, das Artikel-Update des Livedoor-Blogs mit Python und Selen zu automatisieren.
Zum ersten Mal habe ich etwas über Unix (Linux) gelernt.
Ich habe versucht, mit VOICEROID2 automatisch zu lesen und zu speichern
Ich habe versucht, mit Open AI Gym eine verbesserte Lernumgebung für Othello zu schaffen
Sphinx-Schnellstart wurde zu einem Problem und ich versuchte, einen alternativen Befehl zu erstellen, und der Stress verschwand
Ich habe versucht, das Problem mit Python Vol.1 zu lösen
Ich wollte nur die Daten des gewünschten Datums und der gewünschten Uhrzeit mit Django extrahieren
Ich habe versucht, Grad-CAM mit Keras und Tensorflow zu implementieren
Ich habe versucht, die Bayes'sche Optimierung zu durchlaufen. (Mit Beispielen)
Ich habe die gleiche Datenanalyse mit kaggle notebook (python) und PowerBI gleichzeitig versucht ②
Ich habe versucht, das Problem von F02 zu lösen, wie man mit Python offline in Echtzeit schreibt