[PYTHON] [AWS] Migrieren Sie Daten von DynamoDB nach Aurora MySQL

Ich habe manchmal Daten von DynamoDB nach Aurora MySQL migriert, daher werde ich schreiben, wie es geht. Bitte beachten Sie, dass die Datenmigration nicht für alle Tabellen eine Datenmigration ist, da es sich nur um eine Migration von Daten handelt, die in einer Tabelle gespeichert sind.

screenshot 2020-05-09 12.40.26.png

Methode

Ich denke, es gibt verschiedene Möglichkeiten, von DynamoDB zu Aurora MySQL zu migrieren, aber ich habe den Code für eine schnelle und zuverlässige Migration in verfügbarem (?) Python geschrieben. Obwohl es verfügbar ist, können Daten möglicherweise erneut migriert werden, sodass ich sie wiederverwendbar gemacht habe.

Implementierung

Code

Erstellen Sie zunächst ein Modell für DynamoDB und Aurora MySQL Mit der Standard-Bibliotheksdatenklasse können Sie präzise und übersichtlich schreiben.

Aus diesem Grund ist die Scanfunktion so konzipiert, dass sie rekursiv ausgeführt wird, wenn nicht alle Elemente gleichzeitig abgerufen werden können, da beim Abrufen aller Datensätze aus der DynamoDB-Tabelle Einschränkungen bestehen.

Aus Offiziellen Dokumenten

A single Scan operation reads up to the maximum number of items set (if using the Limit parameter) or a maximum of 1 MB of data and then apply any filtering to the results using FilterExpression . If LastEvaluatedKey is present in the response, you need to paginate the result set.

python:models/dynamodb.py


from decimal import Decimal
from dataclasses import dataclass, asdict
from typing import Any, ClassVar, Dict, List

import boto3

from src.utils import convert_to_datetime


@dataclass
class BaseDynamoDB:
    table_name: ClassVar[str]
    hash_key: ClassVar[str]

    @classmethod
    def get_client(cls) -> Any:
        client = boto3.resource("dynamodb")
        return client.Table(cls.table_name)

    @classmethod
    def scan(
        cls, *, recursive: bool = False, exclusive_start_key: Dict = {},
    ) -> List["BaseDynamoDB"]:
        """Holen Sie sich einige oder alle Elemente aus der DynamoDB-Tabelle

Setzen Sie rekursiv auf True, um alle Datensätze abzurufen

        Args:
            recursive (bool):
Der Standardwert ist False, um einige Daten abzurufen
Auf True setzen, um alle Datensätze abzurufen
            exclusive_start_key (Dict):Primärschlüssel für das erste zu scannende Objekt

        Returns:
            List["BaseDynamoDB"]:Liste der Tabellenmodellinstanzen
        """

        client = cls.get_client()
        options = {}

        if exclusive_start_key:
            options.update(exclusive_start_key)

        response = client.scan(**options)
        items = list(map(lambda item: cls(**item), response["Items"]))  # type: ignore

        if recursive and "LastEvaluatedKey" in response:
            tmp = cls.scan(
                recursive=True,
                exclusive_start_key=response["LastEvaluatedKey"],
            )
            items.extend(tmp)

        return items


@dataclass
class Qiita(BaseDynamoDB):
    """Fiktiver Tisch für Qiita"""

    table_name: ClassVar[str] = "qiita"
    hash_key: ClassVar[str] = "user_id"

    user_id: str
    created_at: int
    updated_at: int
    memo: str = ""

    def __post_init__(self) -> None:
        for attr in ("updated_at", "created_at"):
            v = getattr(self, attr)
            if isinstance(v, Decimal):
                setattr(self, attr, convert_to_datetime(str(v)))

    def to_dict(self) -> Dict[str, Any]:
        """Geben Sie die Instanz als Wörterbuch zurück

        Returns:
            Dict[str, Any]
        """

        return asdict(self)

models/aurora.py


from datetime import datetime
from dataclasses import asdict, dataclass, field, InitVar
from typing import Any, ClassVar, Dict


@dataclass
class Qiita:
    """Fiktiver Tisch für Qiita"""

    table_name: ClassVar[str] = "qiita"
    primary_key: ClassVar[str] = "user_id"

    user_id: str

    #DynamoDB-Spalte für Datums- und Zeitverwaltung
    created_at: InitVar[datetime]
    updated_at: InitVar[datetime]
    #Aurora MySQL-Spalte für Datums- und Zeitverwaltung
    registration_date: datetime = field(init=False)
    update_date: datetime = field(init=False)

    memo: str = ""
    registration_id: str = "DynamoDB"
    update_id: str = "DynamoDB"

    def __post_init__(
        self, created_at: datetime, updated_at: datetime
    ) -> None:
        self.registration_date = created_at
        self.update_date = updated_at

    def to_dict(self) -> Dict[str, Any]:
        """Geben Sie die Instanz als Wörterbuch zurück

        Returns:
            Dict[str, Any]
        """

        result = asdict(self)
        result["registration_date"] = self.registration_date
        result["update_date"] = self.update_date

        return result

connection.py


import os
from contextlib import contextmanager
from typing import Iterator

import pymysql
from pymysql.connections import Connection

from src.utils import get_logger


logger = get_logger(__name__)


AURORA_DB = os.environ["AURORA_DB"]
AURORA_HOST = os.environ["AURORA_HOST"]
AURORA_PORT = int(os.environ["AURORA_PORT"])
AURORA_USER = os.environ["AURORA_USER"]
AURORA_PASSWORD = os.environ["AURORA_PASSWORD"]


@contextmanager
def connect() -> Iterator[Connection]:
    """Herstellen einer Verbindung mit Aurora MySQL

    Returns:
        Iterator[Connection]
    """

    try:
        conn = pymysql.connect(
            db=AURORA_DB,
            host=AURORA_HOST,
            port=AURORA_PORT,
            user=AURORA_USER,
            password=AURORA_PASSWORD,
            charset="utf8mb4",
            cursorclass=pymysql.cursors.DictCursor,
            connect_timeout=120,
        )
    except Exception as err:
        logger.error("Verbindung zu Aurora fehlgeschlagen")
        raise err

    try:
        yield conn
    finally:
        conn.close()

Unten ist das Hauptskript Wenn Sie den Modellnamen ändern, können Sie ihn beliebig oft wiederverwenden.

main.py


from src.db.connection import connect
from src.db.sql import INSERT_QIITA
from src.models.dynamodb import Qiita as DynamoDBQiita
from src.models.mysql import Qiita as MySQLQiita
from src.utils import get_logger

logger = get_logger(__name__)


def main():
    logger.info("START")

    items = DynamoDBQiita.scan(recursive=True)
    logger.info(f"{len(items)}Erworben")

    params = list(
        map(
            lambda item: MySQLQiita(**item.to_dict()).to_dict(),
            items,
        )
    )

    try:
        with connect() as conn:
            with conn.cursor() as cursor:
                cursor.executemany(INSERT_QIITA, params)
                count = cursor.rowcount
            conn.commit()
    except Exception as err:
        logger.error(err)
        logger.error("INSERT-Fehler")
    else:
        logger.info(f"{count}Erfolgreich EINFÜGEN")

    logger.info("END")


if __name__ == "__main__":
    main()

Migration

Vorbereitung

IAM

Erstellen Sie einen IAM-Benutzer mit einer schreibgeschützten DynamoDB-Rolle und erhalten Sie eine Zugriffsschlüssel-ID und einen geheimen Zugriffsschlüssel. Aurora MySQL ist nicht erforderlich, da es sich wie gewöhnliches MySQL mit Benutzername und Passwort authentifiziert

.env

Bereiten Sie eine .env-Datei vor, damit pipenv die Anmeldeinformationsdaten liest

.env


AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_DEFAULT_REGION=
AURORA_DB=
AURORA_HOST=
AURORA_PORT=
AURORA_USER=
AURORA_PASSWORD=

Makefile

Ich habe das Makefile geschrieben, damit ich dasselbe Skript für die Bereitstellung und Produktion verwenden und einfach einen einfachen Befehl ausführen kann. Wenn ich ein Python-Skript mit pipenv ausführe, möchte ich den geheimen Schlüssel aus der .env-Datei lesen, daher habe ich für jede Umgebung einen symbolischen Link von .env zur .env. $ (Stage) -Datei hergestellt.

Makefile


stage = stg

.PHONY: dymy
dymy:
	ln -sf .env.$(stage) .env
	pipenv run python -m src.main

Wenn Sie ein Python-Skript mit pipenv ausführen möchten, können Sie es im Skriptabschnitt von Pipfile schreiben. Es wird jedoch empfohlen, Makefile zu schreiben, wenn Sie eine andere Verarbeitung wie diese durchführen möchten. Nun, Shell-Skript ist in Ordnung, aber ich persönlich bevorzuge Makefile

Lebewohl, DynamoDB, willkommen Aurora MySQL

Geben Sie bei der Ausführung in einer Produktionsumgebung prod im Argument stage an.

make dymy stage=prod

Damit ist die Datenmigration von DynamoDB zu Aurora MySQL abgeschlossen. Wenn Sie wirklich überprüfen möchten, ob die Migration erfolgreich war, stimmt die Anzahl der in der Tabelle der DynamoDB-Quelle gespeicherten Elemente mit der Anzahl der Zählungen überein, die durch Ausführen von "SELECT count (*) FROM ~" auf dem Ziel Aurora MySQL erhalten wurden. Bitte sehen Sie, ob

Referenz

Recommended Posts

[AWS] Migrieren Sie Daten von DynamoDB nach Aurora MySQL
Speichern Sie SQLite3-Daten und migrieren Sie zu MySQL
Migrieren Sie von require.txt zu pipenv
Von der Installation von Elasticsearch bis zur Dateneingabe
Migration von direct_to_template zu TemplateView nicht möglich
Ändern Sie die AWS EC2-Instanz von t2 in t3
Empfangen Sie Textdaten von MySQL mit Python
SIGNATE Quest ① Vom Lesen der Daten bis zur Vorverarbeitung
Stellen Sie von Flask aus eine Verbindung zum MySQL-Container von Docker her
[Data Science-Grundlagen] Ich habe versucht, mit Python von CSV auf MySQL zu speichern
Wenn Sie dasselbe wie zuvor verwenden, können Sie dies visualisieren, indem Sie Japan, China und Südkorea ausschließen, in denen viele Menschen infiziert sind. In diesem Fall lauten die Daten für "2020/02/21" wie folgt. Besonders auffällig sind Südostasien, Europa und Nordamerika. Ich hoffe es konvergiert so schnell wie möglich. .. .. Migrieren Sie Python, Pandas, Python3, Zeitreihendaten und eigene CMS-Daten nach WordPress
Herstellen einer Verbindung von Python zu MySQL unter CentOS 6.4
[Kaggle] Vom Lesen der Daten bis zur Vorverarbeitung und Codierung
Senden Sie Daten von Raspberry Pi mit AWS IOT
Abrufen von Daten von MacNote3 und Migrieren zu Write
[Python] Fluss vom Web-Scraping zur Datenanalyse
Es kann keine Verbindung zu MySQL über die Docker-Umgebung (Debian) hergestellt werden.
Summe von 1 bis 10
Stellen Sie eine Verbindung zu MySQL her
Ich konvertiere AWS JSON-Daten wie folgt in CSV
So kratzen Sie Bilddaten von Flickr mit Python
Automatische Datenmigration vom Yahoo Root Lab nach Strava
Terraform konfiguriert, um AWS Lambda von Amazon SQS aus zu starten
Senden Sie Protokolldaten vom Server an Splunk Cloud
Senden Sie Daten von Python über die Socket-Kommunikation an Processing
DataNitro, Implementierung einer Funktion zum Lesen von Daten aus dem Blatt
Löschen Sie DynamoDB-Daten nach 5 Minuten mit den AWS-Schrittfunktionen
Automatische Konvertierung von der MySQL Workbench-MWB-Datei in eine SQL-Datei
Spielen Sie, um Slack mithilfe von AWS PaaS über Raspberry Pi3 über Umgebungsdaten von SensorTag zu informieren