This and that for using Step Functions with CDK + Python

This article is the 23rd day article of CyberAgent Developers Advent Calendar 2020.

What is a CDK?

CDK is a wrapper library for CloudFormation API that allows you to use AWS configuration management with js commands and programming languages ​​such as TypeScript and Python.

Official Document

Since it can be written in a language that you are accustomed to writing, it is easy for server-side engineers who are not accustomed to configuration management in YAML or JSON, and it is recommended for projects where you want to leave infrastructure management to the server-side engineer.

In addition, since it can be abstracted and standardized, a wide range of expressions are possible.

This time, I would like to write from the rudimentary part used when managing the configuration of Step Functions with such an SDK to the slightly in-depth part.

CDK + Python + StepFunctions

For the basic writing method of CDK, refer to getting-started on github etc. In this article, we will focus on the part that handles Step Functions.

Deploy Step Functions

First, the basic writing method is as follows.


from aws_cdk import (
    core,
    aws_lambda as lambdas,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sf
)


class SampleStack(core.Stack):

    def __init__(self,
                 scope: core.Construct,
                 id: str,
                 **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        hello_lambda: lambdas.Function = #Lambda definition

        hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] hello",
                                                            lambda_function=hello_lambda)

        world_lambda: lambdas.Function = #Lambda definition

        world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] world",
                                                            lambda_function=world_lambda)

        definition: sf.Chain = hello_task.next(world_task)

        sf.StateMachine(self, "hello_workflow", definition=definition)

In the code above

  1. Definition of required resources
  2. Definition of task that executes the resource
  3. Workflow definition
  4. Definition of State Machine

Is being done.

Generated workflow

スクリーンショット 2020-12-21 15.20.02.png

Passing values

Passing the task to each task is described as follows.

from aws_cdk import (
    core,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sf
)


class SampleStack(core.Stack):

    def __init__(self,
                 scope: core.Construct,
                 id: str,
                 **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        hello_lambda: lambdas.Function = #Lambda definition

        hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] hello",
                                                            lambda_function=hello_lambda,
                                                            result_path="$.helloLambda")

        world_lambda: lambdas.Function = #Lambda definition

        world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] world",
                                                            lambda_function=world_lambda,
                                                            payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))

        definition: sf.Chain = sf.Chain.start(hello_task).next(world_task)

        sf.StateMachine(self, "hello_workflow", definition=definition)

In StepFunction, you can add the execution result to json by describing result_path. By writing like this, you can pass the execution result of hello_task to world_task. For more information, see Step Functions Official Documentation (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/input-output-resultpath.html).

Error handling

By using add_catch, you can easily notify of errors. It is recommended to output $$ .Execution.id, $ .Error, $ .Cause, etc. because it is easy to investigate the cause and re-execute.

from aws_cdk import (
    core,
    aws_lambda as lambdas,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sf
)


class SampleStack(core.Stack):

    def __init__(self,
                 scope: core.Construct,
                 id: str,
                 **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        hello_lambda: lambdas.Function = #Lambda definition

        hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] hello",
                                                            lambda_function=hello_lambda,
                                                            result_path="$.helloLambda")

        world_lambda: lambdas.Function = #Lambda definition

        world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] world",
                                                            lambda_function=world_lambda,
                                                            payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))

        notification_error: lambdas.Function = #Definition of lambda

        execution_id: str = sf.TaskInput.from_data_at("$$.Execution.Id").value
        err: str = sf.TaskInput.from_data_at("$.Error").value
        cause: str = sf.TaskInput.from_data_at("$.Cause").value

        notification_error_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                                         "[Lambda] notification_error",
                                                                         lambda_function=notification_error,
                                                                         payload=sf.TaskInput.from_object({
                                                                             "execution_id": execution_id,
                                                                             "error": err,
                                                                             "cause": cause
                                                                         }))

        job_failed: sf.Fail = sf.Fail(self,
                                      "Job Failed",
                                      cause="Job Failed",
                                      error="Workflow FAILED")

        error_handler: sf.Chain = notification_error_task.next(job_failed)

        hello_task.add_catch(error_handler, errors=['States.ALL'])
        world_task.add_catch(error_handler, errors=['States.ALL'])

        definition: sf.Chain = sf.Chain.start(hello_task).next(world_task)

        sf.StateMachine(self, "hello_workflow", definition=definition)

There are some points to keep in mind.

--If you do not link sf.Fail with.next ()after the task for error handling, it will be judged as successful on StepFunctions. --If you write notification_error_task.next (job_failed) in each place, you will get Error: State'[Lambda] notification_error' already has a next state.

Generated workflow

スクリーンショット 2020-12-21 15.11.41.png

Write error handling neatly using parallel

You can handle the error with the above method, but you can write it a little better by using sf.Parallel.

from aws_cdk import (
    core,
    aws_lambda as lambdas,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sf
)


class SampleStack(core.Stack):

    def __init__(self,
                 scope: core.Construct,
                 id: str,
                 **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        hello_lambda: lambdas.Function =   #Lambda definition

        hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] hello",
                                                            lambda_function=hello_lambda,
                                                            result_path="$.helloLambda")

        world_lambda: lambdas.Function =   #Lambda definition

        world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] world",
                                                            lambda_function=world_lambda,
                                                            payload=sf.TaskInput.from_json_path_at("$.helloLambda.Payload"))

        notification_error: lambdas.Function =   #Definition of lambda

        execution_id: str = sf.TaskInput.from_data_at("$$.Execution.Id").value
        err: str = sf.TaskInput.from_data_at("$.Error").value
        cause: str = sf.TaskInput.from_data_at("$.Cause").value

        notification_error_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                                         "[Lambda] notification_error",
                                                                         lambda_function=notification_error,
                                                                         payload=sf.TaskInput.from_object({
                                                                             "execution_id": execution_id,
                                                                             "error": err,
                                                                             "cause": cause
                                                                         }))

        job_failed: sf.Fail = sf.Fail(self,
                                      "Job Failed",
                                      cause="Job Failed",
                                      error="Workflow FAILED")

        definition: sf.Chain = sf.Chain.start(hello_task).next(world_task).to_single_state("definition")

        definition.add_catch(notification_error_task.next(job_failed))

        sf.StateMachine(self, "hello_workflow", definition=definition)

By defining it like this, it will handle if any of the tasks in Parallel fails.

Generated workflow

スクリーンショット 2020-12-21 15.43.46.png

Conditional branch

Conditional branching can be implemented by using sf.Choice. It seems to be difficult to see if I try to write multiple conditional branches, but I feel that I dared to do so in that area.

from aws_cdk import (
    core,
    aws_lambda as lambdas,
    aws_stepfunctions_tasks as tasks,
    aws_stepfunctions as sf
)


class SampleStack(core.Stack):

    def __init__(self,
                 scope: core.Construct,
                 id: str,
                 **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        hello_or_world: lambdas.Function =  #Lambda definition

        hello_or_world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                                     "[Lambda] hello or world",
                                                                     lambda_function=hello_or_world,
                                                                     result_path="$helloOrWorld")

        hello_lambda: lambdas.Function =   #Lambda definition

        hello_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] hello",
                                                            lambda_function=hello_lambda)

        world_lambda: lambdas.Function =   #Lambda definition

        world_task: tasks.LambdaInvoke = tasks.LambdaInvoke(self,
                                                            "[Lambda] world",
                                                            lambda_function=world_lambda)

        job_failed: sf.Fail = sf.Fail(self,
                                      "Job Failed",
                                      cause="Job Failed",
                                      error="Workflow FAILED")

        definition: sf.Chain = sf.Chain.start(hello_or_world_task)\
            .next(
                sf.Choice(self, "hello or world ?")
                    .when(sf.Condition.string_equals("$.helloOrWorld.Payload", "hello"), hello_task)
                    .when(sf.Condition.string_equals("$.helloOrWorld.Payload", "world"), world_task)
                    .otherwise(job_failed)
            )

        sf.StateMachine(self, "hello_workflow", definition=definition)

Generated workflow

スクリーンショット 2020-12-21 15.54.50.png

Addictive point

Set result_path properly

It's very simple, but be aware that if you inadvertently forget to define result_path, all the results calculated in the previous step will be overwritten.

Forcibly avoid the type

This is the most troublesome thing I touched this time. When you try to execute an EMR task from StepFunctions (EmrAddStep in EMR), pass the arguments as an array.

args=[
      "spark-submit",
      "--deploy-mode",
      "cluster",
      "--master",
      "yarn",
      "--class",
      "Main",
      "hoge.jar",
      "2020/12/23/01",
      "--tz",
      "utc"
]

For example, if you try to pass the processing result of the previous stage in it,

args: typing.Optional[typing.List[str]]

So I get angry.

Therefore, once you enter an appropriate character string,

args=[
    "<$.hoge.Payload>"
],

Later, I made the definition into a string and replaced it with the result formatted by Lambda. (Please tell me if there is a better way.)

    prerpare_workflow: sf.Chain = sf.Chain.start(emr_create_cluster).next(emr_add_step)

    definition: sf.Parallel = sf.Parallel(self, id="definition")

    definition.branch(prerpare_workflow)

    definition_json = definition.to_state_json()["Branches"][0]

    definition_str = json.dumps(definition_json) \
            .replace('"Args": ["<$.hoge.Payload>"]',
                     '"Args.$": "$.hoge.Payload"', 1)

Recommended Posts

This and that for using Step Functions with CDK + Python
This and that of python properties
This and that using NLTK (memo)
Using Python and MeCab with Azure Databricks
Try using Python with Google Cloud Functions
Tips for using python + caffe with TSUBAME
I'm using tox and Python 3.3 with Travis-CI
Causal reasoning and causal search with Python (for beginners)
This and that useful when used with nohup
AWS CDK with Python
Initial settings for using Python3.8 and pip on CentOS8
[Python for Hikari-] Chapter 06-04 Functions (arguments and return value 3)
Searching for pixiv tags and saving illustrations using Python
Extendable skeletons for Vim using Python, Click and Jinja2
Benchmark for C, Java and Python with prime factorization
Note that writing like this with ruby is writing like this with python
[Python for Hikari-] Chapter 06-01 Functions (Intrinsic Functions and Function Definitions)
[Python for Hikari-] Chapter 06-03 Functions (arguments and return value 2)
Curry arbitrary functions with Python ....
Programming with Python and Tkinter
This and that about pd.DataFrame
Encryption and decryption with Python
Getting Started with Python Functions
Python and hardware-Using RS232C with Python-
[S3] CRUD with S3 using Python [Python]
[Python] Using OpenCV with Python (Basic)
Installation procedure for Python and Ansible with a specific version
Analyze stocks with python and look for favorable trading phases
Serial communication control with python and I2C communication (using USBGPIO8 device)
Miscellaneous notes that I tried using python for the matter
Zabbix API this and that
Keyword arguments for Python functions
Python for super beginners Python #functions 1
python with pyenv and venv
[Python for Hikari-] <Supplement> Chapter 06-05 Functions (arguments and return values 4)
Python 3 sorted and comparison functions
Library for specifying a name server and dig with python
This time I learned python III and IV with Prorate
Easily build network infrastructure and EC2 with AWS CDK Python
[In-Database Python Analysis Tutorial with SQL Server 2017] Step 5: Training and saving a model using T-SQL
[Python] Accessing and cropping image pixels using OpenCV (for beginners)
Serial communication control with python and SPI communication (using USBGPIO8 device)
Using OpenCV with Python @Mac
Python higher-order functions and comprehensions
Works with Python and R
Module summary that automates and assists WebDriver installation with Python
Send using Python with Gmail
I tried to create serverless batch processing for the first time with DynamoDB and Step Functions
Understand the probabilities and statistics that can be used for progress management with a python program
A list of functions that I came across with 100 Numpy knocks and thought "This is convenient!"
Workaround for the problem that? Is displayed when checking the version when using non-standard perl | python with pkg version
[Python3] I made a decorator that declares undefined functions and methods.
[Notes / Updated from time to time] This and that of Azure Functions
Create a striped illusion with gamma correction for Python3 and openCV3
[Python] Error and solution memo when using venv with pyenv + anaconda
[In-Database Python Analysis Tutorial with SQL Server 2017] Step 6: Using the model
Dump, restore and query search for Python class instances using mongodb
Communicate with FX-5204PS with Python and PyUSB
Complement python with emacs using company-jedi
Shining life with Python and OpenCV
Robot running with Arduino and python