I wrote the source code to output CloudWatch Logs to S3 using AWS Lambda (runtime is Python). By the way, I think you can run Lambda manually or regularly with CloudWatch Event.
For boto3
logs = boto3.client("logs") response = logs.create_export_task(**kwargs)
The log output works with, but create_export_task is executed asynchronously, so if you do the next log output without confirming the end of processing, an error may occur. So, when you output multiple logs, be sure to check if the log output is finished.
logs.describe_export_tasks(taskId = response["taskId"])
Let's insert the processing of.
The values of the environment variables are as follows.
variable | value |
---|---|
BUCKET_NAME | Log output destination S3 bucket name |
WAITING_TIME | 10 |
LOG_GROUPS | CloudWatchLogGroup,Connect with a break |
# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback
#Log settings
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])
#Set timezone to Japan time (JST)
JST = timezone(timedelta(hours=9),"JST")
#Date type when outputting logs to S3
DATE_FORMAT = "%Y-%m-%d"
def lambda_handler(event, context):
"""
Output one day's worth of CloudWatch Logs to S3.
The target time is as follows.
AM 00:00:00.000000 ~ PM 23:59:59.999999
"""
try:
#Yesterday PM23:59:59.999999
tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
#Yesterday AM00:00:00.000000
tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
#Used as a prefix when outputting S3 logs
target_date = tmp_yesterday.strftime(DATE_FORMAT)
#Convert to time stamp type for log output (take up to microseconds)
today = int(tmp_today.timestamp() * 1000)
yesterday = int(tmp_yesterday.timestamp() * 1000)
#Get CloudWatchLogGroup from environment variable
logGroups = os.environ["LOG_GROUPS"].split(",")
for logGroupName in logGroups:
try:
keys = ["logGroupName","yesterday","today","target_date"]
values = [logGroupName,yesterday,today,target_date]
payload = dict(zip(keys,values))
#Execute log output
response = logs.create_export_task(
logGroupName = payload["logGroupName"],
fromTime = payload["yesterday"],
to = payload["today"],
destination = BUCKET_NAME,
destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
)
#Wait for the log output to finish executing.
taskId = response["taskId"]
while True:
response = logs.describe_export_tasks(
taskId = taskId
)
status = response["exportTasks"][0]["status"]["code"]
#Break if task execution is finished
if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
logger.info(f"taskId {taskId} has finished exporting")
break
else:
logger.info(f"taskId {taskId} is now exporting")
time.sleep(WAITING_TIME)
continue
except Exception as e:
traceback.print_exc()
logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)
except Exception as e:
traceback.print_exc()
logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
raise
Recommended Posts