[PYTHON] [AWS] Migrer les données de DynamoDB vers Aurora MySQL

J'ai parfois migré des données de DynamoDB vers Aurora MySQL, donc j'écrirai comment le faire. Veuillez noter que la migration de données n'est pas une migration de données pour toutes les tables, car il s'agit uniquement d'une migration de données stockées dans une table.

screenshot 2020-05-09 12.40.26.png

Méthode

Je pense qu'il existe plusieurs façons de migrer de DynamoDB vers Aurora MySQL, mais j'ai écrit le code en Python jetable (?) Pour une migration rapide et fiable. Même si elles sont jetables, les données peuvent être migrées à nouveau, donc je les ai rendues réutilisables.

la mise en oeuvre

code

Tout d'abord, créez un modèle pour DynamoDB et Aurora MySQL Vous pouvez écrire de manière concise et nette en utilisant la classe de données de bibliothèque standard.

En tant que point, la fonction d'analyse est conçue pour s'exécuter de manière récursive en tenant compte du cas où il n'est pas possible d'acquérir tous les éléments de la table DynamoDB à la fois en raison de la limitation.

Depuis Documents officiels

A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.

python:models/dynamodb.py


from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List

import boto3

from src.utils import convert_to_datetime


@dataclass
class BaseDynamoDB:
    table_name: ClassVar[str]
    hash_key: ClassVar[str]

    @classmethod
    def get_client(cls) -> Any:
        client = boto3.resource("dynamodb")
        return client.Table(cls.table_name)

    @classmethod
    def scan(
        cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
    ) -> List["BaseDynamoDB"]:
        """Obtenir certains ou tous les éléments de la table DynamoDB

Définissez récursif sur True pour obtenir tous les enregistrements

        Args:
            recursive (bool):
La valeur par défaut est False pour obtenir des données
Définir sur True pour obtenir tous les enregistrements
            exclusive_start_key (Dict):Clé primaire du premier élément à analyser

        Returns:
            List["BaseDynamoDB"]:Liste d'instances de modèle de table
        """

        client = cls.get_client()
        options = {}

        if exclusive_start_key:
            options.update(exclusive_start_key)

        response = client.scan(**options)
        items = list(map(lambda item: cls(**item), response["Items"]))  # type: ignore

        if recursive and "LastEvaluatedKey" in response:
            tmp = cls.scan(
                recursive=True,
                exclusive_start_key=response["LastEvaluatedKey"],
            )
            items.extend(tmp)

        return items


@dataclass
class Qiita(BaseDynamoDB):
    """Table fictive pour Qiita"""

    table_name: ClassVar[str] = "qiita"
    hash_key: ClassVar[str] = "user_id"

    user_id: str
    created_at: int
    updated_at: int
    memo: str = ""

    def __post_init__(self) -> None:
        for attr in ("updated_at", "created_at"):
            v = getattr(self, attr)
            if isinstance(v, Decimal):
                setattr(self, attr, convert_to_datetime(str(v)))

    def to_dict(self) -> Dict[str, Any]:
        """Renvoie l'instance sous forme de dictionnaire

        Returns:
            Dict[str, Any]
        """

        return asdict(self)

models/aurora.py


from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict


@dataclass
class Qiita:
    """Table fictive pour Qiita"""

    table_name: ClassVar[str] = "qiita"
    primary_key: ClassVar[str] = "user_id"

    user_id: str

    #Colonne de gestion de la date et de l'heure DynamoDB
    created_at: InitVar[datetime]
    updated_at: InitVar[datetime]
    #Colonne de gestion de la date et de l'heure Aurora MySQL
    registration_date: datetime = field(init=False)
    update_date: datetime = field(init=False)

    memo: str = ""
    registration_id: str = "DynamoDB"
    update_id: str = "DynamoDB"

    def __post_init__(
        self, created_at: datetime, updated_at: datetime
    ) -> None:
        self.registration_date = created_at
        self.update_date = updated_at

    def to_dict(self) -> Dict[str, Any]:
        """Renvoie l'instance sous forme de dictionnaire

        Returns:
            Dict[str, Any]
        """

        result = asdict(self)
        result["registration_date"] = self.registration_date
        result["update_date"] = self.update_date

        return result

connection.py


import os
from contextlib import contextmanager
from typing import Iterator

import pymysql
from pymysql.connections import Connection

from src.utils import get_logger


logger = get_logger(__name__)


AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]


@contextmanager
def connect() -> Iterator[Connection]:
    """Établir une connexion avec Aurora MySQL

    Returns:
        Iterator[Connection]
    """

    try:
        conn = pymysql.connect(
            db=AURORA_DB,
            host=AURORA_HOST,
            port=AURORA_PORT,
            user=AURORA_USER,
            password=AURORA_PASSWORD,
            charset="utf8mb4",
            cursorclass=pymysql.cursors.DictCursor,
            connect_timeout=120,
        )
    except Exception as err:
        logger.error("Échec de la connexion à Aurora")
        raise err

    try:
        yield conn
    finally:
        conn.close()

Ci-dessous le script principal Si vous modifiez le nom du modèle, vous pouvez le réutiliser autant que vous le souhaitez.

main.py


from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger

logger = get_logger(__name__)


def main():
    logger.info("START")

    items = DynamoDBQiita.scan(recursive=True)
    logger.info(f"{len(items)}Acquis")

    params = list(
        map(
            lambda item: MySQLQiita(**item.to_dict()).to_dict(),
            items,
        )
    )

    try:
        with connect() as conn:
            with conn.cursor() as cursor:
                cursor.executemany(INSERT_QIITA, params)
                count = cursor.rowcount
            conn.commit()
    except Exception as err:
        logger.error(err)
        logger.error("INSERT échec")
    else:
        logger.info(f"{count}INSERT réussi")

    logger.info("END")


if __name__ == "__main__":
    main()

Migration

Préparation

IAM

Créez un utilisateur IAM avec un rôle en lecture seule DynamoDB et obtenez un ID de clé d'accès et une clé d'accès secrète. Aurora MySQL n'est pas nécessaire car il s'authentifie avec un nom d'utilisateur et un mot de passe comme MySQL ordinaire

.env

Préparez un fichier .env pour que pipenv lise les données d'identification

.env


AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=

Makefile

J'ai écrit le Makefile pour pouvoir utiliser le même script pour la mise en scène et la production, et simplement exécuter une commande simple. Lors de l'exécution d'un script python avec pipenv, je souhaite lire la clé secrète du fichier .env, j'ai donc créé un lien symbolique de .env vers le fichier .env. $ (Stage) pour chaque environnement.

Makefile


stage = stg

.PHONY: dymy
dymy:
	ln -sf .env.$(stage) .env
	pipenv run python -m src.main

En passant, si vous souhaitez exécuter un script python avec pipenv, vous pouvez l'écrire dans la section scripts de Pipfile, mais il est recommandé d'écrire Makefile lorsque vous souhaitez effectuer d'autres traitements comme cette fois. Eh bien, le script shell est bien, mais je préfère personnellement Makefile

Adieu DynamoDB, bienvenue à Aurora MySQL

Lors de l'exécution dans un environnement de production, spécifiez prod dans l'argument stage.

make dymy stage=prod

Ceci termine la migration des données de DynamoDB vers Aurora MySQL. Si vous voulez vraiment vérifier si la migration a réussi, le nombre d'éléments stockés dans la table du DynamoDB source correspond au nombre de décomptes obtenu en exécutant SELECT count (*) FROM ~ sur la destination Aurora MySQL. Veuillez voir si

référence

Recommended Posts

[AWS] Migrer les données de DynamoDB vers Aurora MySQL
Vider les données SQLite3 et migrer vers MySQL
Migrer de requirements.txt vers pipenv
De l'installation d'Elasticsearch à la saisie des données
Impossible de migrer de direct_to_template vers TemplateView
Changer l'instance AWS EC2 de t2 à t3
Recevoir des données textuelles de mysql avec python
SIGNATURE Quête ① De la lecture des données au prétraitement
Connectez-vous au conteneur MySQL de Docker depuis Flask
[Bases de la science des données] J'ai essayé d'enregistrer de csv à mysql avec python
Migrez vos propres données CMS vers WordPress
Connexion de python à MySQL sur CentOS 6.4
[Kaggle] De la lecture des données au prétraitement et au codage
Envoyer des données depuis Raspberry Pi à l'aide d'AWS IOT
Récupération des données de MacNote3 et migration vers Write
[Python] Flux du scraping Web à l'analyse des données
Impossible de se connecter à MySQL depuis l'environnement Docker (Debian)
Somme de 1 à 10
Connectez-vous à mysql
Je convertis les données AWS JSON en CSV comme ceci
Comment récupérer des données d'image de Flickr avec Python
Migration automatique des données de Yahoo Root Lab vers Strava
Terraform configuré pour lancer AWS Lambda à partir d'Amazon SQS
Envoyer les données du journal du serveur vers Splunk Cloud
Envoyer des données de Python au traitement via une communication socket
DataNitro, implémentation de la fonction de lecture des données de feuille
Supprimer les données DynamoDB après 5 minutes avec AWS Step Functions
Conversion automatique du fichier MySQL Workbench mwb en fichier sql
Jouez pour informer Slack des données environnementales de SensorTag à l'aide d'AWS PaaS via Raspberry Pi3