[Python] Create a Batch environment using AWS-CDK

1.First of all

This time, we will implement the AWS Batch environment using CDK. There are many implementation examples in TypeScript, but there weren't many in Python, so I wrote an article.

1.1 Execution environment

The execution environment is as follows. In particular, I will not touch on the installation or the initial settings of aws-cli and aws-cdk. However, as a caveat, aws-cdk has a very high version update frequency, and even the contents currently written may not work.

1.2 Fee

What is worrisome is the price. When I operated under the following conditions, I was charged only the EC2 fee, which was about 0.01 [$ / day]. (In Batch, the instance is created after the queue is added to the job every time, and it is deleted when the job is completed.)

1.3 Procedure

Follow the steps below to prepare the Batch execution environment.

  1. Creating a Python script
  2. Creating a Dockerfile
  3. Register with ECR
  4. Write app in CDK

1.4 Preparation

The folder structure is as follows. The number on the right side of the file name corresponds to the number in the above procedure.

batch_example
└── src
    ├── docker
    │   ├── __init__.py (1)
    │   ├── Dockerfile (2)
    │   ├── requirements.txt (2)
    │   └── Makefile (3)
    └── batch_environment
        ├── app.py (4)
        ├── cdk.json
        └── README.md

2. Implementation

Now, proceed with the implementation according to the above procedure.

2.1 Creating a Python script

An example of script executed in Docker is shown below. click is used to pass command line arguments from CMD watchtower is used to write logs to CloudWatch Logs.

__init__.py


#For time parse
from datetime import datetime
from logging import getLogger, INFO
#Installation library
from boto3.session import Session
import click
import watchtower


#Get the value from the environment variable when specified by envvar
@click.command()
@click.option("--time")
@click.option("--s3_bucket", envvar='S3_BUCKET')
def main(time: str, s3_bucket: str):
    if time:
        #Parsing the time assuming that it will be executed from CloudWatch Event
        d = datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
        #Get execution date
        execute_date = d.strftime("%Y-%m-%d")

    #logger settings
    #The name of the logger becomes the name of the log stream
    logger_name = f"{datetime.now().strftime('%Y/%m/%d')}"
    logger = getLogger(logger_name)
    logger.setLevel(INFO)
    #Specify the name of the CloudWatch Logs log group here
    #Send logs via IAM Role, passing Session
    handler = watchtower.CloudWatchLogHandler(log_group="/aws/some_project", boto3_session=Session())
    logger.addHandler(handler)

    #Scheduled processing
    #Here, only write the execution date and time to CloudWatch Logs
    logger.info(f"{execute_date=}")
    
if __name__ == "__main__":
    """
    python __init__.py 
        --time 2020-09-11T12:30:00Z
        --s3_bucket your-bucket-here
    """
    main()

2.2 Creating a Dockerfile

Next, create a Dockerfile that executes the above Python script. I built it in multiple stages by referring to here.

Dockerfile


#This is a build container
FROM python:3.8-buster as builder

WORKDIR /opt/app

COPY requirements.txt /opt/app
RUN pip3 install -r requirements.txt

#From here, prepare the container for execution
FROM python:3.8-slim-buster as runner

COPY --from=builder /usr/local/lib/python3.8/site-packages /usr/local/lib/python3.8/site-packages
COPY src /opt/app/src

WORKDIR /opt/app/src
CMD ["python3", "__init__.py"]

At the same time, put the library to be used in requirements.txt.

requirements.txt


click
watchtower

2.3 Registration to ECR

After creating the Dockerfile, register it in ECR. First, create a repository by pressing the "Create Repository" button on the ECR from the console.

スクリーンショット 2020-09-27 22.47.35.png

Set the name of the repository appropriately.

スクリーンショット 2020-09-27 22.49.17.png

Select the repository you created and press the "Show Push Command" button.

スクリーンショット 2020-09-27 22.50.23.png

Then, the commands required for pushing will be displayed, so ** copy and execute without thinking. ** ** If you fail here, I think that the AWS CLI settings are not working properly, so please review the AWS CLI settings.

スクリーンショット 2020-09-27 22.51.53.png

Since it is difficult to type the command every time, create a Makefile that copies the above command. (The command --username AWS in 1 seems to be a constant.)

Makefile


.PHONY: help
help:
	@echo " == push docker image to ECR == "
	@echo "type 'make build tag push' to push docker image to ECR"
	@echo ""

.PHONY: login
login:
	(1 command)aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin {ACCOUNT_NUMBER}.dkr.ecr.ap-northeast-1.amazonaws.com

.PHONY: build
build:
	(2 commands)docker build -t {REPOSITORY_NAME} .

.PHONY: tag
tag:
	(3 commands)docker tag {REPOSITORY_NAME}:latest {ACCOUNT_NUMBER}.dkr.ecr.ap-northeast-1.amazonaws.com/{REPOSITORY_NAME}:latest

.PHONY: push
push:
	(4 commands)docker push {ACCOUNT_NUMBER}.dkr.ecr.ap-northeast-1.amazonaws.com/{REPOSITORY_NAME}:latest

By using this Makefile, you can easily shorten the command as follows. In addition, I don't think there is any dangerous information in the above Makefile even if it is leaked to the outside, so I can share the source code.

#Log in to ECR
$ make login

#Push the latest image to ECR
$ make build tag push

2.4 CDK implementation

The implementation contents of the CDK are based on this article written in TypeScript. In addition, it is better to execute $ cdk init in the directory where app.py is implemented in advance.

2.4.1 Installing the packages required for implementation

Each package name is long ... In addition, the installation time is quite long.

$ pip install aws-cdk-core aws-cdk-aws-stepfunctions aws-cdk-aws-stepfunctions-tasks aws-cdk-aws-events-targets aws-cdk.aws-ec2 aws-cdk.aws-batch aws-cdk.aws-ecr

2.4.2 Implementation of app.py

First, create a class for the environment to be built this time. Stack_name and stack_env are set as arguments of the BatchEnvironment class. This corresponds to the name of this environment and the execution environment (verification / development / production). (If you really want to separate the execution environment, I think you need to change the ECR repository as well.)

app.py


from aws_cdk import (
    core,
    aws_ec2,
    aws_batch,
    aws_ecr,
    aws_ecs,
    aws_iam,
    aws_stepfunctions as aws_sfn,
    aws_stepfunctions_tasks as aws_sfn_tasks,
    aws_events,
    aws_events_targets,
)


class BatchEnvironment(core.Stack):
    """
Batch environment and Step Functions to execute it+Create a CloudWatch Event environment

    """
    #ECR repository name created above
    #Pull images from this repository when running in Batch
    ECR_REPOSITORY_ARN = "arn:aws:ecr:ap-northeast-1:{ACCOUNT_NUMBER}:repository/{YOUR_REPOSITORY_NAME}"

    def __init__(self, app: core.App, stack_name: str, stack_env: str):
        super().__init__(scope=app, id=f"{stack_name}-{stack_env}")
        #The following implementation is the image below here.

2.4.3 Implementation of app.py (Creation of VPC environment)

app.py


        # def __init__(...):in

        #CIDR has your favorite range
        cidr = "192.168.0.0/24"

        # === #
        # vpc #
        # === #
        #VPCs are (should) be available for free if you only use public subnets
        vpc = aws_ec2.Vpc(
            self,
            id=f"{stack_name}-{stack_env}-vpc",
            cidr=cidr,
            subnet_configuration=[
                #Define netmask for Public Subnet
                aws_ec2.SubnetConfiguration(
                    cidr_mask=28,
                    name=f"{stack_name}-{stack_env}-public",
                    subnet_type=aws_ec2.SubnetType.PUBLIC,
                )
            ],
        )

        security_group = aws_ec2.SecurityGroup(
            self,
            id=f'security-group-for-{stack_name}-{stack_env}',
            vpc=vpc,
            security_group_name=f'security-group-for-{stack_name}-{stack_env}',
            allow_all_outbound=True
        )

        batch_role = aws_iam.Role(
            scope=self,
            id=f"batch_role_for_{stack_name}-{stack_env}",
            role_name=f"batch_role_for_{stack_name}-{stack_env}",
            assumed_by=aws_iam.ServicePrincipal("batch.amazonaws.com")
        )

        batch_role.add_managed_policy(
            aws_iam.ManagedPolicy.from_managed_policy_arn(
                scope=self,
                id=f"AWSBatchServiceRole-{stack_env}",
                managed_policy_arn="arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole"
            )
        )

        batch_role.add_to_policy(
            aws_iam.PolicyStatement(
                effect=aws_iam.Effect.ALLOW,
                resources=[
                    "arn:aws:logs:*:*:*"
                ],
                actions=[
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ]
            )
        )

        #Role given to EC2
        instance_role = aws_iam.Role(
            scope=self,
            id=f"instance_role_for_{stack_name}-{stack_env}",
            role_name=f"instance_role_for_{stack_name}-{stack_env}",
            assumed_by=aws_iam.ServicePrincipal("ec2.amazonaws.com")
        )

        instance_role.add_managed_policy(
            aws_iam.ManagedPolicy.from_managed_policy_arn(
                scope=self,
                id=f"AmazonEC2ContainerServiceforEC2Role-{stack_env}",
                managed_policy_arn="arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role"
            )
        )

        #Add policy to access S3
        instance_role.add_to_policy(
            aws_iam.PolicyStatement(
                effect=aws_iam.Effect.ALLOW,
                resources=["*"],
                actions=["s3:*"]
            )
        )

        #Add policy to access CloudWatch Logs
        instance_role.add_to_policy(
            aws_iam.PolicyStatement(
                effect=aws_iam.Effect.ALLOW,
                resources=[
                    "arn:aws:logs:*:*:*"
                ],
                actions=[
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ]
            )
        )

        #Grant EC2 a role
        instance_profile = aws_iam.CfnInstanceProfile(
            scope=self,
            id=f"instance_profile_for_{stack_name}-{stack_env}",
            instance_profile_name=f"instance_profile_for_{stack_name}-{stack_env}",
            roles=[instance_role.role_name]
        )

2.4.4 Implementation of app.py (Batch execution environment, job definition, job queue creation)

app.py


        #Continuation of VPC...

        # ===== #
        # batch #
        # ===== #
        batch_compute_resources = aws_batch.ComputeResources(
            vpc=vpc,
            maxv_cpus=4,
            minv_cpus=0,
            security_groups=[security_group],
            instance_role=instance_profile.attr_arn,
            type=aws_batch.ComputeResourceType.SPOT
        )

        batch_compute_environment = aws_batch.ComputeEnvironment(
            scope=self,
            id=f"ProjectEnvironment-{stack_env}",
            compute_environment_name=f"ProjectEnvironmentBatch-{stack_env}",
            compute_resources=batch_compute_resources,
            service_role=batch_role
        )

        job_role = aws_iam.Role(
            scope=self,
            id=f"job_role_{stack_name}-{stack_env}",
            role_name=f"job_role_{stack_name}-{stack_env}",
            assumed_by=aws_iam.ServicePrincipal("ecs-tasks.amazonaws.com")
        )

        job_role.add_managed_policy(
            aws_iam.ManagedPolicy.from_managed_policy_arn(
                scope=self,
                id=f"AmazonECSTaskExecutionRolePolicy_{stack_name}-{stack_env}",
                managed_policy_arn="arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"

            )
        )

        job_role.add_managed_policy(
            aws_iam.ManagedPolicy.from_managed_policy_arn(
                scope=self,
                id=f"AmazonS3FullAccess_{stack_name}-{stack_env}",
                managed_policy_arn="arn:aws:iam::aws:policy/AmazonS3FullAccess"

            )
        )

        job_role.add_managed_policy(
            aws_iam.ManagedPolicy.from_managed_policy_arn(
                scope=self,
                id=f"CloudWatchLogsFullAccess_{stack_name}-{stack_env}",
                managed_policy_arn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
            )
        )

        batch_job_queue = aws_batch.JobQueue(
            scope=self,
            id=f"job_queue_for_{stack_name}-{stack_env}",
            job_queue_name=f"job_queue_for_{stack_name}-{stack_env}",
            compute_environments=[
                aws_batch.JobQueueComputeEnvironment(
                    compute_environment=batch_compute_environment,
                    order=1
                )
            ],
            priority=1
        )

        #Get ECR repository
        ecr_repository = aws_ecr.Repository.from_repository_arn(
            scope=self,
            id=f"image_for_{stack_name}-{stack_env}",
            repository_arn=self.ECR_REPOSITORY_ARN
        )

        #Obtaining an image from ECR
        container_image = aws_ecs.ContainerImage.from_ecr_repository(
            repository=ecr_repository
        )

        #Job definition
        #Here, use it in Python script`S3_BUCKET`As an environment variable
        batch_job_definition = aws_batch.JobDefinition(
            scope=self,
            id=f"job_definition_for_{stack_env}",
            job_definition_name=f"job_definition_for_{stack_env}",
            container=aws_batch.JobDefinitionContainer(
                image=container_image,
                environment={
                    "S3_BUCKET": f"{YOUR_S3_BUCKET}"
                },
                job_role=job_role,
                vcpus=1,
                memory_limit_mib=1024
            )
        )


2.4.5 Implementation of app.py (Create StepFunctions + CloudWatch Events)

From here, it is not always necessary to build the Batch environment, It is done using Step Functions and CloudWatch Event for periodic execution.

You can also call Batch directly from CloudWatch Event, Step Functions are inserted in between, considering the ease of cooperation with other services and the passing of parameters.

When registering as a Step Functions step Overwrite the Docker CMD command (= set in Batch job definition) and It takes the argument time from CloudWatch Event and passes it to the Python script.

app.py


        #Continued from Batch...

        # ============= #
        # StepFunctions #
        # ============= #

        command_overrides = [
            "python", "__init__.py",
            "--time", "Ref::time"
        ]

        batch_task = aws_sfn_tasks.BatchSubmitJob(
            scope=self,
            id=f"batch_job_{stack_env}",
            job_definition=batch_job_definition,
            job_name=f"batch_job_{stack_env}_today",
            job_queue=batch_job_queue,
            container_overrides=aws_sfn_tasks.BatchContainerOverrides(
                command=command_overrides
            ),
            payload=aws_sfn.TaskInput.from_object(
                {
                    "time.$": "$.time"
                }
            )
        )

        #This time there is only one step, so it's simple, but if you want to connect multiple steps
        # batch_task.next(aws_sfn_tasks.JOB).next(aws_sfn_tasks.JOB)
        #You can pass it with a chain method like this.
        definition = batch_task

        sfn_daily_process = aws_sfn.StateMachine(
            scope=self,
            id=f"YourProjectSFn-{stack_env}",
            definition=definition
        )

        # ================ #
        # CloudWatch Event #
        # ================ #

        # Run every day at 21:30 JST
        # See https://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html
        events_daily_process = aws_events.Rule(
            scope=self,
            id=f"DailySFnProcess-{stack_env}",
            schedule=aws_events.Schedule.cron(
                minute=31,
                hour=12,
                month='*',
                day="*",
                year='*'),
        )
        events_daily_process.add_target(aws_events_targets.SfnStateMachine(sfn_daily_process))

        #Up to here def__init__(...):

2.4.6 Implementation of app.py (implementation of main function)

Finally, write the process to execute the CDK and you're done.

app.py


#Here def__init__(...):


def main():
    app = core.App()
    BatchEnvironment(app, "your-project", "feature")
    BatchEnvironment(app, "your-project", "dev")
    BatchEnvironment(app, "your-project", "prod")
    app.synth()


if __name__ == "__main__":
    main()

2.5 deploy

After the above script is completed, check if the CDK is set correctly with the following command, and then deploy it. Even if you create a Batch environment from scratch, it will be completed in about 10 minutes.

#Confirmation of definition
$ cdk synth

Successfully synthesized to {path_your_project}/cdk.out
Supply a stack id (your-project-dev, your-project-feature, your-project-prod) to display its template.

#Confirmation of deployable environment
$ cdk ls

your-project-dev
your-project-feature
your-project-prod

$ cdk deploy your-project-feature

...deploying...

2.5.1 Check if the environment is created correctly

When the deployment is complete, select the Step Functions you created from the console and press the "Start Execution" button.

スクリーンショット 2020-09-27 23.38.58.png

Put only the argument of time,

{
    "time": "2020-09-27T12:31:00Z"
}

スクリーンショット 2020-09-27 23.45.12.png

If it works correctly, you're done. Also, check CloudWatch Logs to see if it works as expected.

3. Conclusion

I really like CDK because you can quickly build and delete the environment with commands!

Also, rather than creating from the console, you can see what is required by the program parameters, so Even if you don't know the service, I thought it would be nice to understand what parameters are required!

(Someday, I will expand the above source in the GitHub repository ...!)

Recommended Posts

[Python] Create a Batch environment using AWS-CDK
Create a Python environment
Create a python GUI using tkinter
Create a Python environment on Mac (2017/4)
Create a virtual environment with Python!
Create a python environment on centos
Create a python environment on your Mac
Let's create a virtual environment for Python
Create a simple scheduled batch using Docker's Python Image and parse-crontab
Create a Vim + Python test environment in 1 minute
Create a GIF file using Pillow in Python
Create a virtual environment with conda in Python
Create a python3 build environment with Sublime Text3
Create a web map using Python and GDAL
[Venv] Create a python virtual environment on Ubuntu
Create a Mac app using py2app and Python3! !!
Create a MIDI file in Python using pretty_midi
Create a Python execution environment on IBM i
[Docker] Create a jupyterLab (python) environment in 3 minutes!
Create a Python virtual development environment on Windows
Create a data collection bot in Python using Selenium
Create a Wox plugin (Python)
Create a function in Python
Create a dictionary in Python
How to set up a Python environment using pyenv
[CRUD] [Django] Create a CRUD site using the Python framework Django ~ 1 ~
[Python] Create a ValueObject with a complete constructor using dataclasses
Create a comfortable Python 3 (Anaconda) development environment on windows
Create a python development environment with vagrant + ansible + fabric
Create JIRA tickets using Python
Build a Python virtual environment using venv (Django + MySQL ①)
Building a Python virtual environment
Build a Python environment on your Mac using pyenv
Create a python numpy array
[CRUD] [Django] Create a CRUD site using the Python framework Django ~ 2 ~
Build a Python development environment using pyenv on MacOS
Build a Python environment offline
Create a directory with python
Create a decent shell and python environment on Windows
Building a Python virtual environment
Memo for building a machine learning environment using Python
Create a company name extractor with python using JCLdic
Create a Python development environment on OS X Lion
[CRUD] [Django] Create a CRUD site using the Python framework Django ~ 3 ~
[CRUD] [Django] Create a CRUD site using the Python framework Django ~ 4 ~
[CRUD] [Django] Create a CRUD site using the Python framework Django ~ 5 ~
Create a Python (pyenv / virtualenv) development environment on Mac (Homebrew)
Building a Python environment on a Mac and using Jupyter lab
Create a simple Python development environment with VSCode & Docker Desktop
Create a virtual environment for python on mac [Very easy]
Until building a Python development environment using pyenv on Ubuntu 20.04
Building a Python environment on Mac
I made a Line-bot using Python!
Create a DI Container in Python
Building a Python environment on Ubuntu
Drawing a silverstone curve using python
Create a nested dictionary using defaultdict
Create a binary file in Python
Building a virtual environment with Python 3
Create a Linux environment on Windows 10
Create a Python general-purpose decorator framework