Verarbeiten Sie zwischen zwei AWS-Konten (VPC-Peering abgeschlossen) Daten von einer Datenbank zur anderen, um einen Stapel wie zu erstellen.
Indem Sie AWS-Services aktiv nutzen und studieren, Legen Sie zunächst DynamoDB dazwischen und teilen Sie Lambda auf. com / ja_jp / lambda / latest / dg / welcome.html) Es wird von der Funktion verarbeitet.
Zugriffsinformationen auf RDS werden mit Secrets Manager gespeichert, und der Ablauf ist [Schrittfunktionen]. Ich habe versucht, es mit (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html) zu erstellen. Planen Sie dies unter ClooudWatch Events.
Lassen Sie uns jeden einzelnen bauen.
① Lambda [RDS to DynamoDB] Der Zugriff auf RDS mit Lambda ist aufgrund des Problems der Anzahl der Verbindungspools ein Anti-Pattern [^ 1]. Da dieser Stapel ein Zeitplanauslöser ist, sollte die Anzahl der gleichzeitigen Ausführungen kein Problem darstellen.
vgl. Lambda + RDS ist ein Anti-Pattern
[^ 1]: RDS-Proxy hat Vorschau angekündigt Scheint gelöst zu sein.
Legen Sie es in die VPC für den RDS-Zugriff. Die erforderlichen IAM-Richtlinien lauten wie folgt.
AWSLambdaVPCAccessExecutionRole
--SecretsManager Leseberechtigung (DescribeSecret
, GetSecretValue
)
--DynamoDB Schreibberechtigung (DescribeTable
, PutItem
)Die Laufzeit ist ** Python 3.8 **. Zusätzlich zu "boto3" verwendet die Bibliothek "pymysql" für den RDS (MySQL) -Zugriff. (Pip lokal installieren und zip hochladen)
Secrets Manager Es speichert geheime Informationen wie RDS und DocumentDB und scheint die Kennwortrotation abhängig von den Einstellungen automatisch zu verwalten. Sie können jeden Gegenstand als Geheimnis speichern, nicht nur Ihre ID und Ihr Passwort. Beispiel) Host, Port, Datenbankname usw.
Wenn ich hier versuche, mit "RDS" zu setzen, kann ich übrigens nur RDS-Instanzen unter dem entsprechenden Konto auswählen. Dieses Mal wollte ich die geheimen Informationen des RDS des Peering-Ziels erstellen, deshalb habe ich beschlossen, sie als willkürliches Geheimnis in "Andere" zu erstellen. (Keine Drehung)
Nach der Erstellung wird ein Beispielcode für jede Sprache generiert. Sie können es einfach kopieren und einfügen. (Kann später bestätigt werden)
get_secret_value_response['SecretString']Oder base64.b64decode(get_secret_value_response['SecretBinary'])Es wird gesagt, dass es verwendet wird, also werde ich es zurückgeben.
Der Anrufer sieht so aus.
#### **`lambda.py`**
```python
secrets = json.loads(get_secret())
conn = pymysql.connect(
secrets['host'],
user=secrets['username'],
passwd=secrets['password'],
db=secrets['dbname'],
cursorclass=pymysql.cursors.DictCursor
)
DynamoDB Erstellen Sie die gewünschte Tabelle im Voraus. Der Primärschlüssel besteht aus einem ** Partitionsschlüssel ** (erforderlich) und einem ** Sortierschlüssel ** (optional).
vgl. Ich habe den Schlüsselindex von DynamoDB zusammengefasst
Datensätze in der relationalen Datenbank scheinen in DynamoDB ** Item ** zu heißen.
Eingabedaten mit PutItem
.
Die unterstützten Typen unterscheiden sich von den MySQL-Typen. Wenn Sie also versuchen, den Datumstyp unverändert einzugeben, tritt ein Fehler auf.
Siehe Namensregeln und Datentypen #Datentypen
Da keine leeren Zeichen eingegeben werden können, muss der Null-Typ explizit angegeben werden.
lambda.py
import boto3
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
for row in data:
for k,v in row.items():
if isinstance(v, str) and v == '':
#Leere Zeichen in DynamoDB''Geben Sie explizit None an, da bei der Eingabe ein Fehler auftritt
row[k] = None
elif isinstance(v, (datetime.date, datetime.datetime, datetime.time)):
#DynamoDB unterstützt keinen Datums- / Uhrzeittyp. Konvertieren Sie ihn daher in eine Zeichenfolge
row[k] = v.isoformat() #Natürlich ist Strftime in Ordnung
#Überschreibt, wenn Schlüssel vorhanden ist
res = table.put_item(
Item=row
)
Es gibt auch ein UpdateItem
, das die zu aktualisierende Spalte angibt, ähnlich wie UPDATE in RDB.
② Lambda [DynamoDB to RDS] Dies wird auch in die VPC gestellt, und die IAM-Richtlinie lautet wie folgt.
AWSLambdaVPCAccessExecutionRole
--SecretsManager Leseberechtigung (DescribeSecret
, GetSecretValue
)
--DynamoDB Leseberechtigung (DescribeTable
, GetItem
, Scan
, Query
)DynamoDB Holen Sie sich ein Element mit "GetItem" der Primärschlüsselspezifikation.
lambda.py
import boto3
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
#GetItem (mit Sortierschlüssel)
res = table.get_item(
Key={
'partition-key-name': VALUE1,
'sort-key-name': VALUE2
}
)
res['Item'] #Der erworbene Gegenstand. Wenn du es nicht bekommen kannst'Item'Ist nicht vorhanden.
Es gibt auch "Abfrage" und "Scan". Als grobes Verständnis, wie man es richtig benutzt ...
Grundsätzlich scheint es gut zu sein, "Query" zu verwenden.
vgl. Ich habe DynamoDB mit Python (boto3) ausprobiert
lambda.py
import boto3
from boto3.dynamodb.conditions import Key #ich brauche das
...
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('table-name')
#Query:Schlüsselspezifikation (& mit Sortierschlüssel)
res = table.query(
KeyConditionExpression=Key('partition-key-name').eq(VALUE1) & Key('sort-key-name').eq(VALUE2)
)
#Query:Indexspezifikation (nur Partitionsschlüssel)
res = table.query(
IndexName='index-name',
KeyConditionExpression=Key('partition-key-name').eq(VALUE3)
)
res['Count'] #Anzahl der Akquisitionen
res['Items'] #Liste der erworbenen Gegenstände
Wenn Sie einen Index mit DynamoDB erstellen, wird er als ** Projektion ** bezeichnet und scheint ein Bild zu sein, in dem eine Kopiertabelle erstellt wird. Abhängig von der Datengröße der Tabelle sollte nur der Schlüssel und nicht alle Spalten wiedergegeben werden. Schlüssel mit Index abrufen → Element mit Schlüssel erneut abrufen Es kann besser sein, die Verarbeitung durchzuführen.
③ Step Functions Ein Prozessablauf kann mithilfe der JSON-Notation erstellt werden. (Es ist nicht auf Lambda beschränkt, das kombiniert werden kann) Sie können bedingte Verzweigungen und Parallelverarbeitung schreiben, sodass Sie einen Fluss ganz flexibel erstellen können. Um ehrlich zu sein, ist es schwierig zu lesen und zu schreiben, wenn es sich nur um eine Definition handelt, aber es ist hilfreich, da es die Syntax überprüft und automatisch ein Flussdiagramm erstellt. Es scheint, dass der erstellte Fluss ** Zustandsmaschine ** heißt.
Vgl. Mit AWS-Schrittfunktionen erstelltes Serverless-Batch-System
Beim Aufruf von Lambda ist in der IAM-Richtlinie "InvokeFunction" erforderlich.
Die Funktionen von ① und ② sind tatsächlich etwas mehr unterteilt, daher werden wir hier einen Fluss erzeugen. Die serielle Verarbeitung und die parallele Verarbeitung sind ungefähr wie folgt.
Serielle Verarbeitung der Lambda-Funktion
{
"Comment": "Kommentar Kommentar Kommentar",
"StartAt": "First Process",
"States": {
"First Process": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"Next": "Second Process"
},
"Second Process": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
}
Parallele Verarbeitung von Lambda-Funktionen
{
"Comment": "Kommentar Kommentar Kommentar",
"StartAt": "Main Process",
"States": {
"Main Process": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Branch Process A",
"States": {
"Branch Process A": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
},
{
"StartAt": "Branch Process B",
"States": {
"Branch Process B": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
"End": true
}
}
}
]
}
}
}
CloudWatch Events Zustandsautomaten können durch CloudWatch-Ereignisse ausgelöst werden. Wenn ein Zeitplan ausgeführt wird, wird er durch den Ausdruck "rate" oder den Ausdruck "cron" angegeben. Ich war so süchtig nach "cron", also nur eine Erinnerung.
cf. Schedule Expressions for Rules
Alle diese Dienstleistungen waren für mich neu, aber sie waren sehr interessant. Lass uns ohne Server sein!
Und wenn ich es versuche, frage ich mich, was ich mit der Lambda-Codeverwaltungs- und Entwicklungsumgebung machen soll ...?
Recommended Posts