[PYTHON] Changer la clé de l'objet sur S3 du format de date normal au format Hive

Qu'as-tu écrit

J'ai dit: "Crachons le journal de l'application, les données d'analyse ou S3 pour le moment! Passé? Vous pourrez y réfléchir plus tard, afin de pouvoir le couper avec aaaa-mm-jj pour le moment!"

~ 1 an plus tard ~

J'ai dit: "Pourquoi les données sont-elles stockées dans un format de chemin difficile à analyser?"

C'était une histoire d'essayer de faire quelque chose parce que c'était dans un tel état.

Ce qui me rend heureux

Comme dans l'exemple ci-dessus, si vous avez une sortie vers S3 avec la clé suivante sans tenir compte de l'opération en particulier

s3://BUCKET_NAME/path/to/2020-01-01/log.json

Lorsque je veux l'analyser, lorsque j'utilise Athena, etc. pour interroger le fichier ici, je me retrouve dans une situation où la partition appropriée ne peut pas être appliquée à la date.

Qu'est-ce que ça veut dire? "Analysons les données de janvier 2019 à tous les niveaux!" Même ainsi, dans S3, il est extrêmement difficile de faire une requête comme «2019-01- *», simplement parce que la chaîne de caractères comme «2019-01-01» est la clé.

Par conséquent, nous envisagerons de convertir la méthode de stockage au format S3 au format Hive. Le format Hive est le suivant.

s3://BUCKET_NAME/path/to/year=2020/month=01/date=01/log.json

Si vous enregistrez l'objet avec une telle clé et coupez la partition pour aaaa / mm / jj sur la table Athena, vous pouvez exécuter la requête en la divisant en dates spécifiques dans la clause Where de SQL. Ce sera plus facile à analyser.

Cependant, étant donné que S3 est un format de stockage qui stocke les objets au format clé-valeur, il n'est pas possible de modifier la clé d'objet à écrire en même temps. Par conséquent, j'ai créé un script pour changer le format Clé en Hive à la fois pour l'objet de la période spécifiée et l'ai exécuté à partir de Lambda.

Création de la fonction Lambda

Immédiatement, le Lambda créé est un simple script à 1 fichier, comme illustré ci-dessous.

import os
import boto3
from datetime import datetime, timedelta

# Load Environment Variables
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
S3_BEFORE_KEY = os.environ['S3_BEFORE_KEY']
S3_AFTER_KEY = os.environ['S3_AFTER_KEY']
S3_BEFORE_FORMAT = os.environ['S3_BEFORE_FORMAT']
FROM_DATE = os.environ['FROM_DATE']
TO_DATE = os.environ['TO_DATE']
DELETE_FRAG = os.environ['DELETE_FRAG']

def date_range(from_date: datetime, to_date: datetime):
    """
    Create Generator Range of Date

    Args:
        from_date (datetime) : datetime param of start date
        to_date (datetime) : datetime param of end date
    Returns:
        Generator
    """
    diff = (to_date - from_date).days + 1
    return (from_date + timedelta(i) for i in range(diff))

def pre_format_key():
    """
    Reformat S3 Key Parameter given 

    Args:
        None
    Returns:
        None
    """
    global S3_BEFORE_KEY
    global S3_AFTER_KEY
    if S3_BEFORE_KEY[-1] == '/':
        S3_BEFORE_KEY = S3_BEFORE_KEY[:-1]
    if S3_AFTER_KEY[-1] == '/':
        S3_AFTER_KEY = S3_AFTER_KEY[:-1]


def change_s3_key(date: datetime):
    """
    Change S3 key from datetime format to Hive format at specific date

    Args:
        date (datetime) : target date to change key
    Returns:
        None
    """
    before_date_str = datetime.strftime(date, S3_BEFORE_FORMAT)
    print('Change following date key format : {}'.format(before_date_str))
    before_path = f'{S3_BEFORE_KEY}/{before_date_str}/'
    after_path = "{}/year={}/month={}/date={}".format(
        S3_AFTER_KEY, date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')
    )
    
    s3 = boto3.client('s3')
    response = s3.list_objects_v2(
        Bucket=S3_BUCKET_NAME,
        Delimiter="/",
        Prefix=before_path
    )
    try:
        for content in response["Contents"]:
            key = content['Key']
            file_name = key.split('/')[-1]
            after_key = f'{after_path}/{file_name}'
            s3.copy_object(
                Bucket=S3_BUCKET_NAME,
                CopySource={'Bucket': S3_BUCKET_NAME, 'Key': key},
                Key=after_key
            )
            if DELETE_FRAG == 'True':
                s3.delete_object(Bucket=S3_BUCKET_NAME, Key=key)
    except Exception as e:
        print(e)
        return


def lambda_handler(event, context):
    pre_format_key()
    from_date = datetime.strptime(FROM_DATE, "%Y%m%d")
    to_date = datetime.strptime(TO_DATE, "%Y%m%d")
    for date in date_range(from_date, to_date):
        change_s3_key(date)

Au moment de l'exécution, il est nécessaire de saisir les paramètres suivants dans Lambda.

--Définissez les éléments suivants dans les variables d'environnement

Variable d'environnement valeur Remarques
S3_BUCKET_NAME Nom du compartiment S3
S3_BEFORE_KEY Avant de changer la clé S3 (chemin)/to)
S3_AFTER_KEY Avant de changer la clé S3 (chemin)/to) Même valeur que ci-dessus si le mouvement des touches n'est pas nécessaire
S3_BEFORE_FORMAT Format de date avant le changement %Y-%m-%dFormats que Python datetime peut reconnaître
FROM_DATE date de début(yyyymmdd) Le point de départ de l'objet dont vous souhaitez modifier la clé
TO_DATE Date de fin(yyyymmdd) Point final de l'objet dont vous souhaitez modifier la clé
DELETE_FRAG True/False S'il faut supprimer l'objet d'origine

--Grant Lambda execution Rôle l'autorité d'opération du compartiment cible de S3

Les paramètres nécessaires ont été convertis en variables d'environnement, veuillez donc les définir comme vous le souhaitez en fonction de votre propre environnement. En outre, la gestion des erreurs est lourde et non mise en œuvre. Puisqu'il s'agit d'un script qui n'est exécuté qu'une seule fois par SPOT, il est réduit au minimum. Veuillez corriger si vous êtes intéressé.

résultat

Nous avons changé la clé S3 existante du format de date normal au format Hive et avons pu faciliter l'analyse en toute sécurité.

Comme information supplémentaire, si vous exécutez Glue Cralwer dans le chemin / vers / couche, un catalogue de données qui peut être lu par Athena est automatiquement généré, y compris la partition, de sorte que la vie d'analyse dans Athena sera enrichie.

S'il vous plaît laissez-moi savoir si l'implémentation ici est étrange ou si vous voulez faire plus comme ça! Ce n'est pas un gros problème, mais je vais garder le référentiel public. https://github.com/kzk-maeda/change-s3-key/blob/master/lambda_function.py

Recommended Posts

Changer la clé de l'objet sur S3 du format de date normal au format Hive
[Python] Comment changer le format de la date (format d'affichage)
Modifiez le point décimal de la journalisation de, à.
[Python] Modifier le contrôle du cache des objets téléchargés sur Cloud Storage
Une introduction à l'orientation des objets - changeons l'état interne d'un objet
L'histoire de la copie de données de S3 vers TeamDrive de Google
Script pour changer la description de fasta
Changer l'ordre de PostgreSQL dans Heroku
Essayez de mesurer la position d'un objet sur le bureau (système de coordonnées réel) à partir de l'image de la caméra avec Python + OpenCV
Comment obtenir la clé sur Amazon S3 avec Boto 3, exemple de mise en œuvre, notes
[Forefront of Object Recognition 2020] Un journal de l'installation de Pytorch sur Windows 10 à l'exécution de CornerNet-Lite.
Comment changer l'apparence du champ de clé étrangère non sélectionné dans le formulaire modèle de Django
Changer la résolution d'Ubuntu s'exécutant sur VirtualBox
De l'introduction de pyethapp à l'exécution du contrat
Histoire de passer de Pipenv à la poésie
[AWS S3] Confirmation de l'existence de dossiers sur S3
J'ai essayé de faciliter la modification du paramètre du proxy authentifié sur Jupyter
Le mur lors du passage du service Django de Python 2.7 à la série Python 3
Essayez d'estimer le nombre de likes sur Twitter
Étapes pour calculer la probabilité d'une distribution normale
Script pour obtenir la date d'expiration du certificat SSL
python> datetime> De la chaîne de date (format ISO: 2015-12-09 12:40:08) au type datetime
[Amazon Linux 2] Passage de l'authentification par clé publique à l'authentification par mot de passe
Comment connaître le nombre de GPU de python ~ Remarques sur l'utilisation du multitraitement avec pytorch ~
J'ai essayé de prédire le genre de musique à partir du titre de la chanson sur le réseau neuronal récurrent
J'ai essayé de trier les objets de l'image du plat de steak-① Détection d'objets
Comment calculer la quantité de calcul appris de ABC134-D
Changer le volume de Pepper en fonction de l'environnement environnant (son)
Comment définir la date au format ISO8601 étendu sur l'index Dataframe
Après tout, l'histoire du retour de Linux à Windows
J'ai essayé de changer le script python de 2.7.11 à 3.6.0 sur Windows10