[Python] Regularly export from CloudWatch Logs to S3 with Lambda

Periodically export the logs saved in CloudWatch Logs to S3 created in [AWS] CloudFormation to create S3 buckets and set lifecycle rules It is a story of developing Lambda with Python.

I have created a GitHub repository for easy trial-> homoluctus / lambda-cwlogs-s3

Requirements

--Export the previous day's log to JST 14:00 every day --Multiple log groups can be exported

development of

language

Development library

Production library

Deployment tool

AWS service

code

Since it is impossible to put all the code, only the main code is put for explanation. See Repository for the complete source code.

Class for log group setting to export

Enabled to set multiple log groups to be exported like TypeScript Interface. If you want to add a log group to export, just inherit the LogGroup class.

I'm trying to make it easier to see which log group's log the S3 object key to export is when. As I wrote in docstring, it has a hierarchical structure such as dest_bucket / dest_obj_first_prefix or log_group / dest_obj_final_prefix / *. If dest_obj_first_prefix is not specified, the name of log_group will be entered. After *, it looks like export task ID / log stream / file. This is automatically added and cannot be controlled.

class LogGroup(object, metaclass=ABCMeta):
    """Configuration base class for exporting CloudWatch Logs to S3

How to add a log group to export

    class Example(LogGroup):
        log_group = 'test'
    """

    # log_group is required, otherwise optional
    log_group: ClassVar[str]

    log_stream: ClassVar[str] = ''
    start_time: ClassVar[int] = get_specific_time_on_yesterday(
        hour=0, minute=0, second=0)
    end_time: ClassVar[int] = get_specific_time_on_yesterday(
        hour=23, minute=59, second=59)
    dest_bucket: ClassVar[str] = 'lambda-cwlogs-s3'
    dest_obj_first_prefix: ClassVar[str] = ''
    dest_obj_final_prefix: ClassVar[str] = get_yesterday('%Y-%m-%d')

    @classmethod
    def get_dest_obj_prefix(cls) -> str:
        """Get the full S3 object prefix

Hierarchical structure of S3
        dest_bucket/dest_obj_first_prefix/dest_obj_final_prefix/*

        Returns:
            str
        """

        first_prefix = cls.dest_obj_first_prefix or cls.log_group
        return f'{first_prefix}/{cls.dest_obj_final_prefix}'

    @classmethod
    def to_args(cls) -> Dict[str, Union[str, int]]:
        args: Dict[str, Union[str, int]] = {
            'logGroupName': cls.log_group,
            'fromTime': cls.start_time,
            'to': cls.end_time,
            'destination': cls.dest_bucket,
            'destinationPrefix': cls.get_dest_obj_prefix()
        }

        if cls.log_stream:
            args['logStreamNamePrefix'] = cls.log_stream

        return args

CloudWatch Logs Client

The CloudWatch Logs API uses the following two

@dataclass
class Exporter:
    region: InitVar[str]
    client: CloudWatchLogsClient = field(init=False)

    def __post_init__(self, region: str):
        self.client = boto3.client('logs', region_name=region)

    def export(self, target: Type[LogGroup]) -> str:
        """Export any CloudWatch Logs log group to S3

        Args:
            target (Type[LogGroup])

        Raises:
            ExportToS3Error

        Returns:
            str:TaskId included in response from CloudWatch Logs API
        """

        try:
            response = self.client.create_export_task(
                **target.to_args())  # type: ignore
            return response['taskId']
        except Exception as err:
            raise ExportToS3Error(err)

    def get_export_progress(self, task_id: str) -> str:
        try:
            response = self.client.describe_export_tasks(taskId=task_id)
            status = response['exportTasks'][0]['status']['code']
            return status
        except Exception as err:
            raise GetExportTaskError(err)

    @classmethod
    def finishes(cls, status_code: str) -> bool:
        """Determine from the status code whether the export task has finished

        Args:
            status_code (str):
                describe_export_Status code included in the tasks response

        Raises:
            ExportToS3Failure:If the status code is CANCELLED or FAILED

        Returns:
            bool
        """

        uppercase_status_code = status_code.upper()

        if uppercase_status_code == 'COMPLETED':
            return True
        elif uppercase_status_code in ['CANCELLED', 'FAILED']:
            raise ExportToS3Failure('Export failure to S3')
        return False

main

Get the set child class of the log group you want to export with LogGroup. \ _ \ _ Subclasses \ _ \ _ (). \ _ \ _ subclasses \ _ \ _ () returns a list, so turn it with a for statement. You can only run one CloudWatch Logs export task at a time in your account, so hit the describe_export_tasks API to see if the task is complete. If it is not completed, I try to wait 5s. Since create_export_task is an asynchronous API, we have no choice but to poll it that way.

def export_to_s3(exporter: Exporter, target: Type[LogGroup]) -> bool:
    task_id = exporter.export(target)
    logger.info(f'{target.log_group}Is being exported to S3({task_id=})')

    while True:
        status = exporter.get_export_progress(task_id)
        if exporter.finishes(status):
            return True
        sleep(5)


def main(event: Any, context: Any) -> bool:
    exporter = Exporter(region='ap-northeast-1')
    targets = LogGroup.__subclasses__()
    logger.info(f'The log group to be exported is{len(targets)}Pieces')

    for target in targets:
        try:
            export_to_s3(exporter, target)
        except GetExportTaskError as err:
            logger.warning(err)
            logger.warning(f'{target.log_group}Failure to get progress')
        except Exception as err:
            logger.error(err)
            logger.error(f'{target.log_group}Failed to export to S3')
        else:
            logger.info(f'{target.log_group}Export to S3')

    return True

serverless.yml

The following is a partial excerpt. Set up an IAM Role so that Lambda can create export tasks and get task information. Then I want to run the export every day at JST 14:00, so specify cron (0 5 * *? *) For events. CloudWatch Events runs in UTC, so if you do -9h, it will run at JST 14:00 as expected.

  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - 'logs:createExportTask'
        - 'logs:DescribeExportTasks'
      Resource:
        - 'arn:aws:logs:${self:provider.region}:${self:custom.accountId}:log-group:*'

functions:
  export:
    handler: src/handler.main
    memorySize: 512
    timeout: 120
    events:
      - schedule: cron(0 5 * * ? *)
    environment:
      TZ: Asia/Tokyo

in conclusion

homoluctus / lambda-cwlogs-s3 also has a CloudFormation template for creating GitHub Actions and destination S3. Please refer to it.

Reference

Recommended Posts

[Python] Regularly export from CloudWatch Logs to S3 with Lambda
Connect to s3 with AWS Lambda Python
Move CloudWatch logs to S3 on a regular basis with Lambda
Create folders from '01' to '12' with python
[Lambda] [Python] Post to Twitter from Lambda!
[Python] Convert CSV file uploaded to S3 to JSON file with AWS Lambda
I want to analyze logs with Python
How to access RDS from Lambda (python)
Upload what you got in request to S3 with AWS Lambda Python
Copy data from Amazon S3 to Google Cloud Storage with Python (boto)
AWS-Perform web scraping regularly with Lambda + Python + Cron
I tried to get CloudWatch data with Python
Sample to send slack notification with python lambda
Upload files to Google Drive with Lambda (Python)
From Python environment construction to virtual environment construction with anaconda
Changes from Python 3.0 to Python 3.5
How to scrape image data from flickr with python
From buying a computer to running a program with python
I want to AWS Lambda with Python on Mac!
Copy S3 files from Python to GCS using GSUtil
ODBC access to SQL Server from Linux with Python
Lambda Function (python version) that decompresses and outputs elements to CloudWatch Logs when a compressed file is uploaded to s3
Connect to BigQuery with Python
Post from Python to Slack
[S3] CRUD with S3 using Python [Python]
Cheating from PHP to Python
Connect to Wikipedia with Python
[AWS] Try adding Python library to Layer with SAM + Lambda (Python)
Post to slack with Python 3
S3 operation with python boto3
Anaconda updated from 4.2.0 to 4.3.0 (python3.5 updated to python3.6)
Query Athena from Lambda Python
Challenge problem 5 with Python: lambda ... I decided to copy without
Introduction to Python for VBA users-Calling Python from Excel with xlwings-
Switch from python2.7 to python3.6 (centos7)
Connect to sqlite from python
Switch python to 2.7 with alternatives
Write to csv with Python
With skype, notify with skype from python!
How to use Python lambda
Process the gzip file UNLOADed with Redshift with Python of Lambda, gzip it again and upload it to S3
A story that I fixed when I got Lambda logs from Cloudwatch Logs
[Python 3.8 ~] How to define a recursive function smartly with a lambda expression
Send images taken with ESP32-WROOM-32 to AWS (API Gateway → Lambda → S3)
Try to extract a character string from an image with Python3
Things to note when running Python on EC2 from AWS Lambda
Introduction to Data Analysis with Python P17-P26 [ch02 1.usa.gov data from bit.ly]
Manipulating kintone data with Python & C Data ODBC Driver from AWS Lambda
I read "Reinforcement Learning with Python: From Introduction to Practice" Chapter 1
From the introduction of JUMAN ++ to morphological analysis of Japanese with Python
Pass a list by reference from Python to C ++ with pybind11
I read "Reinforcement Learning with Python: From Introduction to Practice" Chapter 2
HDA distribution from Houdini to export FBX with hierarchy and transforms
Try to get CloudWatch metrics with re: dash python data source
The fastest way to get camera images regularly with python opencv
[Python] Try to recognize characters from images with OpenCV and pyocr
Call Matlab from Python to optimize
[Introduction to Udemy Python 3 + Application] 58. Lambda
Python: How to use async with
Call C from Python with DragonFFI
Using Rstan from Python with PypeR