Ich habe manchmal Daten von DynamoDB nach Aurora MySQL migriert, daher werde ich schreiben, wie es geht. Bitte beachten Sie, dass die Datenmigration nicht für alle Tabellen eine Datenmigration ist, da es sich nur um eine Migration von Daten handelt, die in einer Tabelle gespeichert sind.
Ich denke, es gibt verschiedene Möglichkeiten, von DynamoDB zu Aurora MySQL zu migrieren, aber ich habe den Code für eine schnelle und zuverlässige Migration in verfügbarem (?) Python geschrieben. Obwohl es verfügbar ist, können Daten möglicherweise erneut migriert werden, sodass ich sie wiederverwendbar gemacht habe.
Erstellen Sie zunächst ein Modell für DynamoDB und Aurora MySQL Mit der Standard-Bibliotheksdatenklasse können Sie präzise und übersichtlich schreiben.
Aus diesem Grund ist die Scanfunktion so konzipiert, dass sie rekursiv ausgeführt wird, wenn nicht alle Elemente gleichzeitig abgerufen werden können, da beim Abrufen aller Datensätze aus der DynamoDB-Tabelle Einschränkungen bestehen.
A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.
python:models/dynamodb.py
from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List
import boto3
from src.utils import convert_to_datetime
@dataclass
class BaseDynamoDB:
table_name: ClassVar[str]
hash_key: ClassVar[str]
@classmethod
def get_client(cls) -> Any:
client = boto3.resource("dynamodb")
return client.Table(cls.table_name)
@classmethod
def scan(
cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
) -> List["BaseDynamoDB"]:
"""Holen Sie sich einige oder alle Elemente aus der DynamoDB-Tabelle
Setzen Sie rekursiv auf True, um alle Datensätze abzurufen
Args:
recursive (bool):
Der Standardwert ist False, um einige Daten abzurufen
Auf True setzen, um alle Datensätze abzurufen
exclusive_start_key (Dict):Primärschlüssel für das erste zu scannende Objekt
Returns:
List["BaseDynamoDB"]:Liste der Tabellenmodellinstanzen
"""
client = cls.get_client()
options = {}
if exclusive_start_key:
options.update(exclusive_start_key)
response = client.scan(**options)
items = list(map(lambda item: cls(**item), response["Items"])) # type: ignore
if recursive and "LastEvaluatedKey" in response:
tmp = cls.scan(
recursive=True,
exclusive_start_key=response["LastEvaluatedKey"],
)
items.extend(tmp)
return items
@dataclass
class Qiita(BaseDynamoDB):
"""Fiktiver Tisch für Qiita"""
table_name: ClassVar[str] = "qiita"
hash_key: ClassVar[str] = "user_id"
user_id: str
created_at: int
updated_at: int
memo: str = ""
def __post_init__(self) -> None:
for attr in ("updated_at", "created_at"):
v = getattr(self, attr)
if isinstance(v, Decimal):
setattr(self, attr, convert_to_datetime(str(v)))
def to_dict(self) -> Dict[str, Any]:
"""Geben Sie die Instanz als Wörterbuch zurück
Returns:
Dict[str, Any]
"""
return asdict(self)
models/aurora.py
from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict
@dataclass
class Qiita:
"""Fiktiver Tisch für Qiita"""
table_name: ClassVar[str] = "qiita"
primary_key: ClassVar[str] = "user_id"
user_id: str
#DynamoDB-Spalte für Datums- und Zeitverwaltung
created_at: InitVar[datetime]
updated_at: InitVar[datetime]
#Aurora MySQL-Spalte für Datums- und Zeitverwaltung
registration_date: datetime = field(init=False)
update_date: datetime = field(init=False)
memo: str = ""
registration_id: str = "DynamoDB"
update_id: str = "DynamoDB"
def __post_init__(
self, created_at: datetime, updated_at: datetime
) -> None:
self.registration_date = created_at
self.update_date = updated_at
def to_dict(self) -> Dict[str, Any]:
"""Geben Sie die Instanz als Wörterbuch zurück
Returns:
Dict[str, Any]
"""
result = asdict(self)
result["registration_date"] = self.registration_date
result["update_date"] = self.update_date
return result
connection.py
import os
from contextlib import contextmanager
from typing import Iterator
import pymysql
from pymysql.connections import Connection
from src.utils import get_logger
logger = get_logger(__name__)
AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]
@contextmanager
def connect() -> Iterator[Connection]:
"""Herstellen einer Verbindung mit Aurora MySQL
Returns:
Iterator[Connection]
"""
try:
conn = pymysql.connect(
db=AURORA_DB,
host=AURORA_HOST,
port=AURORA_PORT,
user=AURORA_USER,
password=AURORA_PASSWORD,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=120,
)
except Exception as err:
logger.error("Verbindung zu Aurora fehlgeschlagen")
raise err
try:
yield conn
finally:
conn.close()
Unten ist das Hauptskript Wenn Sie den Modellnamen ändern, können Sie ihn beliebig oft wiederverwenden.
main.py
from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger
logger = get_logger(__name__)
def main():
logger.info("START")
items = DynamoDBQiita.scan(recursive=True)
logger.info(f"{len(items)}Erworben")
params = list(
map(
lambda item: MySQLQiita(**item.to_dict()).to_dict(),
items,
)
)
try:
with connect() as conn:
with conn.cursor() as cursor:
cursor.executemany(INSERT_QIITA, params)
count = cursor.rowcount
conn.commit()
except Exception as err:
logger.error(err)
logger.error("INSERT-Fehler")
else:
logger.info(f"{count}Erfolgreich EINFÜGEN")
logger.info("END")
if __name__ == "__main__":
main()
IAM
Erstellen Sie einen IAM-Benutzer mit einer schreibgeschützten DynamoDB-Rolle und erhalten Sie eine Zugriffsschlüssel-ID und einen geheimen Zugriffsschlüssel. Aurora MySQL ist nicht erforderlich, da es sich wie gewöhnliches MySQL mit Benutzername und Passwort authentifiziert
.env
Bereiten Sie eine .env-Datei vor, damit pipenv die Anmeldeinformationsdaten liest
.env
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=
Makefile
Ich habe das Makefile geschrieben, damit ich dasselbe Skript für die Bereitstellung und Produktion verwenden und einfach einen einfachen Befehl ausführen kann. Wenn ich ein Python-Skript mit pipenv ausführe, möchte ich den geheimen Schlüssel aus der .env-Datei lesen, daher habe ich für jede Umgebung einen symbolischen Link von .env zur .env. $ (Stage) -Datei hergestellt.
Makefile
stage = stg
.PHONY: dymy
dymy:
ln -sf .env.$(stage) .env
pipenv run python -m src.main
Wenn Sie ein Python-Skript mit pipenv ausführen möchten, können Sie es im Skriptabschnitt von Pipfile schreiben. Es wird jedoch empfohlen, Makefile zu schreiben, wenn Sie eine andere Verarbeitung wie diese durchführen möchten. Nun, Shell-Skript ist in Ordnung, aber ich persönlich bevorzuge Makefile
Geben Sie bei der Ausführung in einer Produktionsumgebung prod im Argument stage an.
make dymy stage=prod
Damit ist die Datenmigration von DynamoDB zu Aurora MySQL abgeschlossen. Wenn Sie wirklich überprüfen möchten, ob die Migration erfolgreich war, stimmt die Anzahl der in der Tabelle der DynamoDB-Quelle gespeicherten Elemente mit der Anzahl der Zählungen überein, die durch Ausführen von "SELECT count (*) FROM ~" auf dem Ziel Aurora MySQL erhalten wurden. Bitte sehen Sie, ob
Recommended Posts