[PYTHON] [AWS] Migrate data from DynamoDB to Aurora MySQL

I sometimes migrated data from DynamoDB to Aurora MySQL, so I will write how to do it. Please note that data migration is not data migration for all tables, as it is only migration of data stored in one table.

screenshot 2020-05-09 12.40.26.png

Method

I think there are several ways to migrate from DynamoDB to Aurora MySQL, but I wrote the code in disposable (?) Python for a quick and reliable migration. Even though it is disposable, data may be migrated again, so I made it reusable.

Implementation

code

First, create a model for DynamoDB and Aurora MySQL You can write concisely and neatly using the standard library dataclass.

As a point, the scan function is designed to execute recursively in consideration of the case where it is not possible to get all at once due to the limit when fetching all records from the DynamoDB table.

From Official Documentation

A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.

python:models/dynamodb.py


from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List

import boto3

from src.utils import convert_to_datetime


@dataclass
class BaseDynamoDB:
    table_name: ClassVar[str]
    hash_key: ClassVar[str]

    @classmethod
    def get_client(cls) -> Any:
        client = boto3.resource("dynamodb")
        return client.Table(cls.table_name)

    @classmethod
    def scan(
        cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
    ) -> List["BaseDynamoDB"]:
        """Get some or all items from DynamoDB Table

Set recursive to True to get all records

        Args:
            recursive (bool):
The default is False to get some data
Set to True to get all records
            exclusive_start_key (Dict):Primary key of the first item to scan

        Returns:
            List["BaseDynamoDB"]:Table model instance list
        """

        client = cls.get_client()
        options = {}

        if exclusive_start_key:
            options.update(exclusive_start_key)

        response = client.scan(**options)
        items = list(map(lambda item: cls(**item), response["Items"]))  # type: ignore

        if recursive and "LastEvaluatedKey" in response:
            tmp = cls.scan(
                recursive=True,
                exclusive_start_key=response["LastEvaluatedKey"],
            )
            items.extend(tmp)

        return items


@dataclass
class Qiita(BaseDynamoDB):
    """Fictitious table for Qiita"""

    table_name: ClassVar[str] = "qiita"
    hash_key: ClassVar[str] = "user_id"

    user_id: str
    created_at: int
    updated_at: int
    memo: str = ""

    def __post_init__(self) -> None:
        for attr in ("updated_at", "created_at"):
            v = getattr(self, attr)
            if isinstance(v, Decimal):
                setattr(self, attr, convert_to_datetime(str(v)))

    def to_dict(self) -> Dict[str, Any]:
        """Return the instance as a dictionary

        Returns:
            Dict[str, Any]
        """

        return asdict(self)

models/aurora.py


from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict


@dataclass
class Qiita:
    """Fictitious table for Qiita"""

    table_name: ClassVar[str] = "qiita"
    primary_key: ClassVar[str] = "user_id"

    user_id: str

    #DynamoDB date and time management column
    created_at: InitVar[datetime]
    updated_at: InitVar[datetime]
    #Aurora MySQL date and time management column
    registration_date: datetime = field(init=False)
    update_date: datetime = field(init=False)

    memo: str = ""
    registration_id: str = "DynamoDB"
    update_id: str = "DynamoDB"

    def __post_init__(
        self, created_at: datetime, updated_at: datetime
    ) -> None:
        self.registration_date = created_at
        self.update_date = updated_at

    def to_dict(self) -> Dict[str, Any]:
        """Return the instance as a dictionary

        Returns:
            Dict[str, Any]
        """

        result = asdict(self)
        result["registration_date"] = self.registration_date
        result["update_date"] = self.update_date

        return result

connection.py


import os
from contextlib import contextmanager
from typing import Iterator

import pymysql
from pymysql.connections import Connection

from src.utils import get_logger


logger = get_logger(__name__)


AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]


@contextmanager
def connect() -> Iterator[Connection]:
    """Establishing a connection with Aurora MySQL

    Returns:
        Iterator[Connection]
    """

    try:
        conn = pymysql.connect(
            db=AURORA_DB,
            host=AURORA_HOST,
            port=AURORA_PORT,
            user=AURORA_USER,
            password=AURORA_PASSWORD,
            charset="utf8mb4",
            cursorclass=pymysql.cursors.DictCursor,
            connect_timeout=120,
        )
    except Exception as err:
        logger.error("Failure to connect to Aurora")
        raise err

    try:
        yield conn
    finally:
        conn.close()

Below is the main script If you change the model name, you can reuse it as much as you want.

main.py


from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger

logger = get_logger(__name__)


def main():
    logger.info("START")

    items = DynamoDBQiita.scan(recursive=True)
    logger.info(f"{len(items)}Acquired")

    params = list(
        map(
            lambda item: MySQLQiita(**item.to_dict()).to_dict(),
            items,
        )
    )

    try:
        with connect() as conn:
            with conn.cursor() as cursor:
                cursor.executemany(INSERT_QIITA, params)
                count = cursor.rowcount
            conn.commit()
    except Exception as err:
        logger.error(err)
        logger.error("INSERT failure")
    else:
        logger.info(f"{count}Successful INSERT")

    logger.info("END")


if __name__ == "__main__":
    main()

Migration

Preparation

IAM

Create an IAM User with a DynamoDB Read Only Role and get an Access Key ID and Secret Access Key. Aurora MySQL authenticates with username and password like normal MySQL, so it is unnecessary

.env

Prepare an .env file to have pipenv read the credential data

.env


AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=

Makefile

I wrote the Makefile so that I can use the same script for staging and production, and just execute a simple command. When executing a python script with pipenv, I want to read the secret key from the .env file, so I made a symbolic link from .env to the .env. $ (Stage) file for each environment.

Makefile


stage = stg

.PHONY: dymy
dymy:
	ln -sf .env.$(stage) .env
	pipenv run python -m src.main

As an aside, if you want to execute a python script with pipenv, you can write it in the scripts section of Pipfile, but it is recommended to write a Makefile when you want to do other processing like this time. Well, shell script is fine, but I personally prefer Makefile.

Farewell DynamoDB, Welcome Aurora MySQL

When executing in the production environment, specify prod in the stage argument.

make dymy stage=prod

This completes the data migration from DynamoDB to Aurora MySQL. If you really want to check if the migration was successful, the number of items stored in the table of the migration source DynamoDB matches the count number obtained by executing SELECT count (*) FROM ~ in the migration destination Aurora MySQL. Please see if

reference

Recommended Posts

[AWS] Migrate data from DynamoDB to Aurora MySQL
Dump SQLite3 data and migrate to MySQL
Migrate from requirements.txt to pipenv
From Elasticsearch installation to data entry
Cannot migrate from direct_to_template to TemplateView
Change AWS EC2 instance from t2 to t3
Receive textual data from mysql with python
SIGNATE Quest ① From data reading to preprocessing
Connect to Docker's MySQL container from Flask
[Data science basics] I tried saving from csv to mysql with python
Migrate your own CMS data to WordPress
Automatically update CSV files to AWS DynamoDB
Connecting from python to MySQL on CentOS 6.4
[Kaggle] From data reading to preprocessing and encoding
Send data from Raspberry Pi using AWS IOT
Data retrieval from MacNote3 and migration to Write
[Python] Flow from web scraping to data analysis
Problems connecting to MySQL from Docker environment (Debian)
Sum from 1 to 10
Connect to mysql
I convert AWS JSON data to CSV like this
How to scrape image data from flickr with python
Automatic data migration from yahoo root lab to Strava
Terraform configured to launch AWS Lambda from Amazon SQS
Send log data from the server to Splunk Cloud
Send data from Python to Processing via socket communication
DataNitro, implementation of function to read data from sheet
Delete DynamoDB data after 5 minutes with AWS Step Functions
Automatic conversion from MySQL Workbench mwb file to sql file
Play to notify Slack of environmental data using AWS PaaS from SensorTag via Raspberry Pi3