J'ai parfois migré des données de DynamoDB vers Aurora MySQL, donc j'écrirai comment le faire. Veuillez noter que la migration de données n'est pas une migration de données pour toutes les tables, car il s'agit uniquement d'une migration de données stockées dans une table.
Je pense qu'il existe plusieurs façons de migrer de DynamoDB vers Aurora MySQL, mais j'ai écrit le code en Python jetable (?) Pour une migration rapide et fiable. Même si elles sont jetables, les données peuvent être migrées à nouveau, donc je les ai rendues réutilisables.
Tout d'abord, créez un modèle pour DynamoDB et Aurora MySQL Vous pouvez écrire de manière concise et nette en utilisant la classe de données de bibliothèque standard.
En tant que point, la fonction d'analyse est conçue pour s'exécuter de manière récursive en tenant compte du cas où il n'est pas possible d'acquérir tous les éléments de la table DynamoDB à la fois en raison de la limitation.
Depuis Documents officiels
A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.
python:models/dynamodb.py
from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List
import boto3
from src.utils import convert_to_datetime
@dataclass
class BaseDynamoDB:
table_name: ClassVar[str]
hash_key: ClassVar[str]
@classmethod
def get_client(cls) -> Any:
client = boto3.resource("dynamodb")
return client.Table(cls.table_name)
@classmethod
def scan(
cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
) -> List["BaseDynamoDB"]:
"""Obtenir certains ou tous les éléments de la table DynamoDB
Définissez récursif sur True pour obtenir tous les enregistrements
Args:
recursive (bool):
La valeur par défaut est False pour obtenir des données
Définir sur True pour obtenir tous les enregistrements
exclusive_start_key (Dict):Clé primaire du premier élément à analyser
Returns:
List["BaseDynamoDB"]:Liste d'instances de modèle de table
"""
client = cls.get_client()
options = {}
if exclusive_start_key:
options.update(exclusive_start_key)
response = client.scan(**options)
items = list(map(lambda item: cls(**item), response["Items"])) # type: ignore
if recursive and "LastEvaluatedKey" in response:
tmp = cls.scan(
recursive=True,
exclusive_start_key=response["LastEvaluatedKey"],
)
items.extend(tmp)
return items
@dataclass
class Qiita(BaseDynamoDB):
"""Table fictive pour Qiita"""
table_name: ClassVar[str] = "qiita"
hash_key: ClassVar[str] = "user_id"
user_id: str
created_at: int
updated_at: int
memo: str = ""
def __post_init__(self) -> None:
for attr in ("updated_at", "created_at"):
v = getattr(self, attr)
if isinstance(v, Decimal):
setattr(self, attr, convert_to_datetime(str(v)))
def to_dict(self) -> Dict[str, Any]:
"""Renvoie l'instance sous forme de dictionnaire
Returns:
Dict[str, Any]
"""
return asdict(self)
models/aurora.py
from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict
@dataclass
class Qiita:
"""Table fictive pour Qiita"""
table_name: ClassVar[str] = "qiita"
primary_key: ClassVar[str] = "user_id"
user_id: str
#Colonne de gestion de la date et de l'heure DynamoDB
created_at: InitVar[datetime]
updated_at: InitVar[datetime]
#Colonne de gestion de la date et de l'heure Aurora MySQL
registration_date: datetime = field(init=False)
update_date: datetime = field(init=False)
memo: str = ""
registration_id: str = "DynamoDB"
update_id: str = "DynamoDB"
def __post_init__(
self, created_at: datetime, updated_at: datetime
) -> None:
self.registration_date = created_at
self.update_date = updated_at
def to_dict(self) -> Dict[str, Any]:
"""Renvoie l'instance sous forme de dictionnaire
Returns:
Dict[str, Any]
"""
result = asdict(self)
result["registration_date"] = self.registration_date
result["update_date"] = self.update_date
return result
connection.py
import os
from contextlib import contextmanager
from typing import Iterator
import pymysql
from pymysql.connections import Connection
from src.utils import get_logger
logger = get_logger(__name__)
AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]
@contextmanager
def connect() -> Iterator[Connection]:
"""Établir une connexion avec Aurora MySQL
Returns:
Iterator[Connection]
"""
try:
conn = pymysql.connect(
db=AURORA_DB,
host=AURORA_HOST,
port=AURORA_PORT,
user=AURORA_USER,
password=AURORA_PASSWORD,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=120,
)
except Exception as err:
logger.error("Échec de la connexion à Aurora")
raise err
try:
yield conn
finally:
conn.close()
Ci-dessous le script principal Si vous modifiez le nom du modèle, vous pouvez le réutiliser autant que vous le souhaitez.
main.py
from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger
logger = get_logger(__name__)
def main():
logger.info("START")
items = DynamoDBQiita.scan(recursive=True)
logger.info(f"{len(items)}Acquis")
params = list(
map(
lambda item: MySQLQiita(**item.to_dict()).to_dict(),
items,
)
)
try:
with connect() as conn:
with conn.cursor() as cursor:
cursor.executemany(INSERT_QIITA, params)
count = cursor.rowcount
conn.commit()
except Exception as err:
logger.error(err)
logger.error("INSERT échec")
else:
logger.info(f"{count}INSERT réussi")
logger.info("END")
if __name__ == "__main__":
main()
IAM
Créez un utilisateur IAM avec un rôle en lecture seule DynamoDB et obtenez un ID de clé d'accès et une clé d'accès secrète. Aurora MySQL n'est pas nécessaire car il s'authentifie avec un nom d'utilisateur et un mot de passe comme MySQL ordinaire
.env
Préparez un fichier .env pour que pipenv lise les données d'identification
.env
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=
Makefile
J'ai écrit le Makefile pour pouvoir utiliser le même script pour la mise en scène et la production, et simplement exécuter une commande simple. Lors de l'exécution d'un script python avec pipenv, je souhaite lire la clé secrète du fichier .env, j'ai donc créé un lien symbolique de .env vers le fichier .env. $ (Stage) pour chaque environnement.
Makefile
stage = stg
.PHONY: dymy
dymy:
ln -sf .env.$(stage) .env
pipenv run python -m src.main
En passant, si vous souhaitez exécuter un script python avec pipenv, vous pouvez l'écrire dans la section scripts de Pipfile, mais il est recommandé d'écrire Makefile lorsque vous souhaitez effectuer d'autres traitements comme cette fois. Eh bien, le script shell est bien, mais je préfère personnellement Makefile
Lors de l'exécution dans un environnement de production, spécifiez prod dans l'argument stage.
make dymy stage=prod
Ceci termine la migration des données de DynamoDB vers Aurora MySQL.
Si vous voulez vraiment vérifier si la migration a réussi, le nombre d'éléments stockés dans la table du DynamoDB source correspond au nombre de décomptes obtenu en exécutant SELECT count (*) FROM ~
sur la destination Aurora MySQL. Veuillez voir si
Recommended Posts