Ich sagte: "Lassen Sie uns vorerst das App-Protokoll, die Analysedaten oder S3 ausspucken! Bestanden? Sie können später darüber nachdenken, damit Sie es vorerst mit" JJJJ-MM-TT "schneiden können!"
~ 1 Jahr später ~
Ich sagte: "Warum werden die Daten in einem Pfadformat gespeichert, das schwer zu analysieren ist?"
Es war eine Geschichte, in der versucht wurde, etwas zu tun, weil es sich in einem solchen Zustand befand.
Wie im obigen Beispiel, wenn Sie mit der folgenden Taste an S3 ausgegeben haben, ohne die Operation besonders zu berücksichtigen
s3://BUCKET_NAME/path/to/2020-01-01/log.json
Wenn ich es analysieren möchte, wenn ich Athena usw. verwende, um die Datei hier abzufragen, gerate ich in eine Situation, in der die entsprechende Partition nicht auf das Datum angewendet werden kann.
Was bedeutet das? "Lassen Sie uns die Daten vom Januar 2019 auf breiter Front analysieren!" Trotzdem ist es in S3 äußerst schwierig, eine Abfrage wie "2019-01- *" durchzuführen, nur weil die Zeichenfolge wie "2019-01-01" der Schlüssel ist.
Daher werden wir erwägen, die Speichermethode in S3 in das Hive-Format zu konvertieren. Das Hive-Format lautet wie folgt.
s3://BUCKET_NAME/path/to/year=2020/month=01/date=01/log.json
Wenn Sie das Objekt mit einem solchen Schlüssel speichern und die Partition für JJJJ / MM / TT in der Athena-Tabelle ausschneiden, können Sie die Abfrage ausführen, indem Sie sie in der Where-Klausel von SQL in bestimmte Daten unterteilen. Es wird einfacher zu analysieren sein.
Da S3 jedoch ein Speicherformat ist, in dem Objekte im Schlüsselwertformat gespeichert werden, ist es nicht möglich, den Schlüssel für das sofortige Schreiben des Objekts zu ändern. Aus diesem Grund habe ich ein Skript erstellt, um das Format "Schlüssel in Hive" für das Objekt des angegebenen Zeitraums sofort zu ändern, und es von Lambda ausgeführt.
Das erstellte Lambda ist sofort ein einfaches 1-Datei-Skript, wie unten gezeigt.
import os
import boto3
from datetime import datetime, timedelta
# Load Environment Variables
S3_BUCKET_NAME = os.environ['S3_BUCKET_NAME']
S3_BEFORE_KEY = os.environ['S3_BEFORE_KEY']
S3_AFTER_KEY = os.environ['S3_AFTER_KEY']
S3_BEFORE_FORMAT = os.environ['S3_BEFORE_FORMAT']
FROM_DATE = os.environ['FROM_DATE']
TO_DATE = os.environ['TO_DATE']
DELETE_FRAG = os.environ['DELETE_FRAG']
def date_range(from_date: datetime, to_date: datetime):
"""
Create Generator Range of Date
Args:
from_date (datetime) : datetime param of start date
to_date (datetime) : datetime param of end date
Returns:
Generator
"""
diff = (to_date - from_date).days + 1
return (from_date + timedelta(i) for i in range(diff))
def pre_format_key():
"""
Reformat S3 Key Parameter given
Args:
None
Returns:
None
"""
global S3_BEFORE_KEY
global S3_AFTER_KEY
if S3_BEFORE_KEY[-1] == '/':
S3_BEFORE_KEY = S3_BEFORE_KEY[:-1]
if S3_AFTER_KEY[-1] == '/':
S3_AFTER_KEY = S3_AFTER_KEY[:-1]
def change_s3_key(date: datetime):
"""
Change S3 key from datetime format to Hive format at specific date
Args:
date (datetime) : target date to change key
Returns:
None
"""
before_date_str = datetime.strftime(date, S3_BEFORE_FORMAT)
print('Change following date key format : {}'.format(before_date_str))
before_path = f'{S3_BEFORE_KEY}/{before_date_str}/'
after_path = "{}/year={}/month={}/date={}".format(
S3_AFTER_KEY, date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')
)
s3 = boto3.client('s3')
response = s3.list_objects_v2(
Bucket=S3_BUCKET_NAME,
Delimiter="/",
Prefix=before_path
)
try:
for content in response["Contents"]:
key = content['Key']
file_name = key.split('/')[-1]
after_key = f'{after_path}/{file_name}'
s3.copy_object(
Bucket=S3_BUCKET_NAME,
CopySource={'Bucket': S3_BUCKET_NAME, 'Key': key},
Key=after_key
)
if DELETE_FRAG == 'True':
s3.delete_object(Bucket=S3_BUCKET_NAME, Key=key)
except Exception as e:
print(e)
return
def lambda_handler(event, context):
pre_format_key()
from_date = datetime.strptime(FROM_DATE, "%Y%m%d")
to_date = datetime.strptime(TO_DATE, "%Y%m%d")
for date in date_range(from_date, to_date):
change_s3_key(date)
Zum Zeitpunkt der Ausführung müssen die folgenden Einstellungen in Lambda eingegeben werden.
--Stellen Sie Folgendes in den Umgebungsvariablen ein
Umgebungsvariable | Wert | Bemerkungen |
---|---|---|
S3_BUCKET_NAME | S3 Bucket Name | |
S3_BEFORE_KEY | Vor dem Ändern der S3-Taste (Pfad)/to) | |
S3_AFTER_KEY | Vor dem Ändern der S3-Taste (Pfad)/to) | Gleicher Wert wie oben, wenn keine Tastenbewegung erforderlich ist |
S3_BEFORE_FORMAT | Datumsformat vorab ändern | %Y-%m-%d Formate, die Python datetime erkennen kann |
FROM_DATE | Anfangsdatum(yyyymmdd) | Der Startpunkt des Objekts, für das Sie den Schlüssel ändern möchten |
TO_DATE | Endtermin(yyyymmdd) | Endpunkt des Objekts, für das Sie den Schlüssel ändern möchten |
DELETE_FRAG | True/False | Gibt an, ob das ursprüngliche Objekt gelöscht werden soll |
--Grant Lambda-Ausführung Rolle der Operationsautorität des Ziel-Buckets von S3
Die erforderlichen Einstellungen wurden in Umgebungsvariablen vorgenommen. Stellen Sie sie daher entsprechend Ihrer eigenen Umgebung nach Ihren Wünschen ein. Außerdem ist die Fehlerbehandlung umständlich und nicht implementiert. Da es sich um ein Skript handelt, das von SPOT nur einmal ausgeführt wird, wird die Implementierung auf ein Minimum beschränkt. Bitte korrigieren Sie bei Interesse.
Wir haben den vorhandenen S3-Schlüssel vom normalen Datumsformat in das Hive-Format geändert und konnten die sichere Analyse vereinfachen.
Wenn Sie Glue Cralwer im Pfad / zu / Layer ausführen, wird automatisch ein Datenkatalog generiert, der von Athena gelesen werden kann, einschließlich Partition, sodass die Lebensdauer der Analyse in Athena angereichert wird.
Bitte lassen Sie mich wissen, ob die Implementierung hier seltsam ist oder ob Sie mehr davon tun möchten! Es ist keine große Sache, aber ich werde das Repository öffentlich halten. https://github.com/kzk-maeda/change-s3-key/blob/master/lambda_function.py