J'ai utilisé AWS Lambda (le runtime est Python) pour écrire le code source afin de générer CloudWatch Logs vers S3. En passant, vous pouvez penser à exécuter Lambda manuellement ou régulièrement avec CloudWatch Event.
Pour boto3
logs = boto3.client("logs") response = logs.create_export_task(**kwargs)
La sortie de journal fonctionne avec, mais create_export_task est exécutée de manière asynchrone, donc si vous effectuez la sortie de journal suivante sans confirmer la fin du traitement, une erreur peut se produire. Ainsi, lorsque vous sortez plusieurs journaux, assurez-vous de vérifier si la sortie du journal est terminée.
logs.describe_export_tasks(taskId = response["taskId"])
Insérons le traitement de.
Les valeurs des variables d'environnement sont les suivantes.
variable | valeur |
---|---|
BUCKET_NAME | Nom du compartiment S3 de destination de sortie du journal |
WAITING_TIME | 10 |
LOG_GROUPS | CloudWatchLogGroup,Connectez-vous avec une pause |
# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback
#Paramètres du journal
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])
#Définir le fuseau horaire sur l'heure du Japon (JST)
JST = timezone(timedelta(hours=9),"JST")
#Type de date lors de la sortie des journaux vers S3
DATE_FORMAT = "%Y-%m-%d"
def lambda_handler(event, context):
"""
Exportez l'équivalent d'une journée de CloudWatch Logs vers S3.
Le temps cible est le suivant.
AM 00:00:00.000000 ~ PM 23:59:59.999999
"""
try:
#Hier PM23:59:59.999999
tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
#Hier AM00:00:00.000000
tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
#Utilisé comme préfixe lors de la sortie des journaux S3
target_date = tmp_yesterday.strftime(DATE_FORMAT)
#Convertir en type d'horodatage pour la sortie du journal (prendre jusqu'à microsecondes)
today = int(tmp_today.timestamp() * 1000)
yesterday = int(tmp_yesterday.timestamp() * 1000)
#Obtenir CloudWatchLogGroup à partir de la variable d'environnement
logGroups = os.environ["LOG_GROUPS"].split(",")
for logGroupName in logGroups:
try:
keys = ["logGroupName","yesterday","today","target_date"]
values = [logGroupName,yesterday,today,target_date]
payload = dict(zip(keys,values))
#Exécuter la sortie du journal
response = logs.create_export_task(
logGroupName = payload["logGroupName"],
fromTime = payload["yesterday"],
to = payload["today"],
destination = BUCKET_NAME,
destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
)
#Attendez que la sortie du journal se termine.
taskId = response["taskId"]
while True:
response = logs.describe_export_tasks(
taskId = taskId
)
status = response["exportTasks"][0]["status"]["code"]
#Pause si l'exécution de la tâche est terminée
if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
logger.info(f"taskId {taskId} has finished exporting")
break
else:
logger.info(f"taskId {taskId} is now exporting")
time.sleep(WAITING_TIME)
continue
except Exception as e:
traceback.print_exc()
logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)
except Exception as e:
traceback.print_exc()
logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
raise
Recommended Posts