[PYTHON] I tried to create serverless batch processing for the first time with DynamoDB and Step Functions

Serverless batch on AWS

Between two AWS accounts (VPC Peering completed), process data from one database to the other database to create a batch like.

qiita1.png

By actively using AWS services as well as studying, First, put DynamoDB in between and split [Lambda](https://docs.aws.amazon. com / ja_jp / lambda / latest / dg / welcome.html) I made it into a form to process with function.

qiita2.png

And the access information to RDS is stored using Secrets Manager, and the series of flows is [Step Functions]. I tried to build it with (https://docs.aws.amazon.com/ja_jp/step-functions/latest/dg/welcome.html). Schedule this at ClooudWatch Events.

qiita3.png

Let's build each one.

① Lambda [RDS to DynamoDB] Accessing RDS with Lambda is an anti-pattern due to the problem of the number of connection pools [^ 1], Since this batch is a schedule trigger, the number of concurrent executions should not be an issue.

cf. Lambda + RDS is an anti-pattern

[^ 1]: This issue with RDS Proxy announced preview Seems to be resolved.

Place it in a VPC for RDS access. The required IAM policies are:

The runtime is ** Python 3.8 **. In addition to boto3, the library uses pymysql for RDS (MySQL) access. (Pip install locally and upload zip)

Secrets Manager It stores secret information such as RDS and DocumentDB, and seems to automatically manage password rotation depending on the settings. You can store any item as a secret, not just your ID and password. Example) Host, port, database name, etc.

By the way, when I try to set with "RDS" here, I can only select RDS instances under the corresponding account. This time I wanted to create the secret information of the RDS of the Peering destination, so I decided to create it as an arbitrary secret in "Other". (No rotation)

Once created, sample code for each language will be generated. You can just copy and paste it. (Can be confirmed later)

get_secret_value_response['SecretString']Or base64.b64decode(get_secret_value_response['SecretBinary'])It is said that it will be used, so I will return it.



 The caller looks like this.


#### **`lambda.py`**
```python

    secrets = json.loads(get_secret())

    conn = pymysql.connect(
            secrets['host'],
            user=secrets['username'],
            passwd=secrets['password'],
            db=secrets['dbname'],
            cursorclass=pymysql.cursors.DictCursor
        )

DynamoDB Create the required table in advance. The primary key consists of a ** partition key ** (required) and a ** sort key ** (optional).

cf. I summarized the key index of DynamoDB

Records in relational DB seem to be called ** Item ** in DynamoDB.

Input data with PutItem. The supported types are different from the MySQL types, so if you try to input the date type etc. as it is, an error will occur.

cf. Naming Rules and Data Types #Data Types

Also, since empty strings cannot be entered, it is necessary to explicitly specify the Null type.

cf. The story that data could not be entered in DynamoDB if it was left empty when using Boto3

cf. DynamoDB can't register an empty value, but you can convert an empty value to a NULL type just by passing an option to the DynamoDB document client constructor?

lambda.py


import boto3

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    for row in data:
        for k,v in row.items():
            if isinstance(v, str) and v == '':
                #Empty string in DynamoDB''Explicitly specify None as an error will occur if you try to input
                row[k] = None
            elif isinstance(v, (datetime.date, datetime.datetime, datetime.time)):
                #DynamoDB does not support date / time type, so convert it to a string
                row[k] = v.isoformat() #Of course strftime is fine

        #Overwrites if key exists
        res = table.put_item(
            Item=row
        )

There is also a ʻUpdateItem` that specifies the column to be updated, similar to UPDATE in RDB.

② Lambda [DynamoDB to RDS] This is also placed in the VPC, and the IAM policy is as follows.

DynamoDB Get one item with GetItem of the primary key specification.

lambda.py


import boto3

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    #GetItem (with sort key)
    res = table.get_item(
        Key={
            'partition-key-name': VALUE1,
            'sort-key-name': VALUE2
        }
    )

    res['Item'] #The acquired item. If you can't get it'Item'Does not exist.

There are also Query and Scan. As a rough understanding of how to use it properly ...

Basically, it seems good to use Query.

cf. I tried DynamoDB using Python (boto3)

lambda.py


import boto3
from boto3.dynamodb.conditions import Key #I need this

...

    dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('table-name')

    #Query:Key specification (& with sort key)
    res = table.query(
        KeyConditionExpression=Key('partition-key-name').eq(VALUE1) & Key('sort-key-name').eq(VALUE2)
    )

    #Query:Index specification (& partition key only)
    res = table.query(
        IndexName='index-name',
        KeyConditionExpression=Key('partition-key-name').eq(VALUE3)
    )

    res['Count'] #Number of acquisitions
    res['Items'] #List of acquired items

When creating an index with DynamoDB, it is called ** projection **, and it looks like an image where a copy table is created. Depending on the data size of the table, only the key should be reflected rather than all columns. Get key with index → Get item again with key It may be better to do the processing.

③ Step Functions Those that can build a processing flow in JSON notation. (It is not limited to Lambda that can be combined) You can write conditional branching and parallel processing, so you can create a flow quite flexibly. To be honest, it is difficult to read and write if it is just a definition, but it is helpful because it checks the syntax and automatically creates a flow diagram. It seems that the created flow is called ** state machine **.

cf. Serverless batch system created with AWS Step Functions

When calling Lambda, ʻInvoke Function` is required in the IAM policy.

The functions of ① and ② are actually a little more subdivided, so we will create a flow here. The serial processing and parallel processing are roughly as follows.

Serial processing of Lambda functions


{
  "Comment": "Comment Comment Comment",
  "StartAt": "First Process",
  "States": {
    "First Process": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
      "Next": "Second Process"
    },
    "Second Process": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
      "End": true
    }
  }
}

Lambda function concurrency


{
  "Comment": "Comment Comment Comment",
  "StartAt": "Main Process",
  "States": {
    "Main Process": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "Branch Process A",
          "States": {
            "Branch Process A": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
              "End": true
            }
          }
        },
        {
          "StartAt": "Branch Process B",
          "States": {
            "Branch Process B": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:FUNCTION_NAME",
              "End": true
            }
          }
        }
      ]
    }
  }
}

CloudWatch Events State machines can be triggered by CloudWatch Events. When executing a schedule, it is specified by the rate expression or the cron expression, I'm addicted to cron so it's a reminder.

--6 elements of minute, hour, day, month, day, year --One of the wildcards for day and day of the week must be ? (Both * and the combination of * and value specification are NG) --Since the time is UTC regardless of the region, specify 9 hours before Japan time.

cf. Schedule Expressions for Rules

in conclusion

All of these services were new to me, but they were very interesting. Let's serverless!

And when I try it, I wonder how to manage Lambda code and development environment ...?

Recommended Posts

I tried to create serverless batch processing for the first time with DynamoDB and Step Functions
I tried tensorflow for the first time
I tried using scrapy for the first time
I tried python programming for the first time.
I tried Mind Meld for the first time
I tried Python on Mac for the first time.
AI Gaming I tried it for the first time
Create a command to search for similar compounds from the target database with RDKit and check the processing time
I want to create a lunch database [EP1] Django study for the first time
I want to create a lunch database [EP1-4] Django study for the first time
I tried to compare the processing speed with dplyr of R and pandas of Python
The first step to creating a serverless application with Zappa
I tried to illustrate the time and time in C language
I tried to display the time and today's weather w
I tried the Google Cloud Vision API for the first time
I want to create a Dockerfile for the time being.
I tried to automatically post to ChatWork at the time of deployment with fabric and ChatWork Api
I tried to describe the traffic in real time with WebSocket
I tried to create Bulls and Cows with a shell program
For the time being, I want to convert files with ffmpeg !!
I tried logistic regression analysis for the first time using Titanic data
I tried to refer to the fun rock-paper-scissors poi for beginners with Python
I want to separate the processing between test time and production environment
I tried to learn the angle from sin and cos with chainer
I tried to create CSV upload, data processing, download function with Django
Impressions and memorandums when working with VS code for the first time
For the first time in Numpy, I will update it from time to time
I tried to control the network bandwidth and delay with the tc command
The first step to get rid of slow queries! I tried to notify Chatwork of slow queries for RDS for MySQL using Lambda and AWS CLI v2
[Note] Deploying Azure Functions for the first time
I played with Floydhub for the time being
Try posting to Qiita for the first time
I tried to extract named entities with the natural language processing library GiNZA
I tried to process and transform the image and expand the data for machine learning
I tried to create a button for Slack with Raspberry Pi + Tact Switch
[Introduction to AWS] I tried porting the conversation app and playing with text2speech @ AWS ♪
GTUG Girls + PyLadiesTokyo Meetup I went to machine learning for the first time
I tried to create a model with the sample of Amazon SageMaker Autopilot
The story that had nothing to do with partitions when I did disk backup with dd for the first time
I tried to learn the sin function with chainer
I tried to create a table only with Django
I tried to classify Mr. Habu and Mr. Habu with natural language processing × naive Bayes classifier
What I got into Python for the first time
I tried to read and save automatically with VOICEROID2 2
I tried to implement and learn DCGAN with PyTorch
The first step of machine learning ~ For those who want to implement with python ~
I tried to solve the soma cube with python
I tried to automate the article update of Livedoor blog with Python and selenium.
For the first time, I learned about Unix (Linux).
I tried to automatically read and save with VOICEROID2
I tried to implement time series prediction with GBDT
I tried to create a reinforcement learning environment for Othello with Open AI gym
Lists, functions, for, while, with (open), class and learning supplements up to the last time (Python beginners after learning Ruby)
sphinx-quickstart got messy and I tried to create an alternative command and the stress disappeared
I tried to solve the problem with Python Vol.1
I just wanted to extract the data of the desired date and time with Django
I tried to implement Grad-CAM with keras and tensorflow
I tried to step through Bayesian optimization. (With examples)
I tried the same data analysis with kaggle notebook (python) and Power BI at the same time ②
How to write offline real time I tried to solve the problem of F02 with Python
I tried to move ROS (Melodic) with the first Raspberry Pi (Stretch) at the beginning of 2021