Boto3 (manipulate AWS resources with Python library) API that is often used privately

*** Below are some of the APIs listed in the "AWS SDK for Python" that are related to boto3 (Python library) that I often use. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

*** Please read the resource name and various setting values as appropriate ~ ***

Athena

# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")

class QueryFailed(Exception):
    """
Exception class called when Athena query execution fails
    """
    pass

#Query execution (asynchronous processing)
start_query_response = athena.start_query_execution(
    QueryString = 'Query',
    QueryExecutionContext={
        "Database": "GlueDB name used to query"
    },
    ResultConfiguration={
        "OutputLocation" : "s3://Output bucket name/Key"
    }
)
query_execution_id = start_query_response["QueryExecutionId"]
#Check the execution status of the query
while True:
    query_status = athena.get_query_execution(
        QueryExecutionId = query_execution_id
    )
    query_execution_status = query_status['QueryExecution']['Status']['State']
    if query_execution_status == 'SUCCEEDED':
        break
    elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
        raise QueryFailed(f"query_execution_status = {query_execution_status}")
        time.sleep(10)
#Get query execution result (only if successful)
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)

CloudWatchLogs


# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback

#Log settings
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])

#Set timezone to Japan time (JST)
JST = timezone(timedelta(hours=9),"JST")

#Date type when outputting logs to S3
DATE_FORMAT = "%Y-%m-%d"

def lambda_handler(event, context):
    """
Output one day's worth of CloudWatch Logs to S3.
The target time is as follows.
    AM 00:00:00.000000 ~ PM 23:59:59.999999
    """
    try:
        #Yesterday PM23:59:59.999999
        tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
        #Yesterday AM00:00:00.000000
        tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
        #Used as a prefix when outputting S3 logs
        target_date = tmp_yesterday.strftime(DATE_FORMAT)
        #Convert to time stamp type for log output (take up to microseconds)
        today = int(tmp_today.timestamp() * 1000)
        yesterday = int(tmp_yesterday.timestamp() * 1000)


        #Get CloudWatchLogGroup from environment variable
        logGroups = os.environ["LOG_GROUPS"].split(",")
        for logGroupName in logGroups:
            try:
                keys = ["logGroupName","yesterday","today","target_date"]
                values = [logGroupName,yesterday,today,target_date]
                payload = dict(zip(keys,values))

                #Execute log output
                response = logs.create_export_task(
                    logGroupName = payload["logGroupName"],
                    fromTime = payload["yesterday"],
                    to = payload["today"],
                    destination = BUCKET_NAME,
                    destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
                )

                #Wait for the log output to finish executing.
                taskId = response["taskId"]
                while True:
                    response = logs.describe_export_tasks(
                        taskId = taskId
                    )
                    status = response["exportTasks"][0]["status"]["code"]
                    #Break if task execution is finished
                    if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
                        logger.info(f"taskId {taskId} has finished exporting")
                        break
                    else:
                        logger.info(f"taskId {taskId} is now exporting")
                        time.sleep(WAITING_TIME)
                        continue

            except Exception as e:
                traceback.print_exc()
                logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)

    except Exception as e:
        traceback.print_exc()
        logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
        raise

DynamoDB

# -*- coding: utf-8 -*-
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')

#Get 1 item
response = table.get_item(
    Key = {
        "id" : "1"
    }
)
#Register one item
response = table.put_item(
    Item = {
        "id" : "1",
        "key" : "value"
    }
)
#Updated 1 item
response = table.update_item(
    Key = {
        "id" : "1"
    },
    UpdateExpression = "set key2 = :val",
    ExpressionAttributeValues = {
        ":val" : "value2"
    },
    ReturnValues = "UPDATED_NEW"
)
#Delete 1 item
response = table.delete_item(
    Key = {
        "id" : "1"
    }
)


#Delete all items (all records) truncate or delete from table
#Get all data
delete_items = []
parameters   = {}
while True:
    response = table.scan(**parameters)
    delete_items.extend(response["Items"])
    if "LastEvaluatedKey" in response:
        parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
    else:
        break
#Key extraction
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#Data deletion
with table.batch_writer() as batch:
    for key in delete_keys:
        batch.delete_item(Key = key)



#Delete table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
    for table_name in response['TableNames']:
        if table_name == "Table name to be deleted":
            dynamodb_client.delete_table(TableName = table_name)
            waiter = dynamodb_client.get_waiter("table_not_exists")
            waiter.wait(TableName = table_name)
            #If you have Target Tracking type Auto Scaling, you can delete CloudWatch Alarm at the same time by deleting ScalingPolicy.
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}ReadCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:ReadCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}WriteCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:WriteCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")



#Create table
table_name = "table"
dynamodb.create_table(
    TableName = table_name,
    KeySchema = [{
        "AttributeName" : "id",
        "KeyType" : "HASH"
    }],
    AttributeDefinitions = [{
        "AttributeName" : "id",
        "AttributeType" : "S"
    }],
    ProvisionedThroughput = {
        "ReadCapacityUnits" : 1,
        "WriteCapacityUnits" : 1
    }
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
#Target Tracking type Auto Scaling settings
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM Role ARN for Auto Scaling"
)
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:WriteCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM Role ARN for Auto Scaling"
)
#Set scaling policy
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}ReadCapacity",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBReadCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}WriteCapacity",
    ScalableDimension='dynamodb:table:WriteCapacityUnits',
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)



#Table schema update
response = dynamodb_client.update_table(
    AttributeDefinitions = [
        {
            'AttributeName': 'string',
            'AttributeType': 'S'|'N'|'B'
        },
    ],
    TableName = 'string',
    BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
    ProvisionedThroughput = {
        'ReadCapacityUnits': 123,
        'WriteCapacityUnits': 123
    }
)


#Batch processing
table = dynamodb.Table("table")
with table.batch_writer() as batch:
    for i in range(10 ** 6):
        batch.put_item(
            Item = {
                "id" : str(i + 1),
                "key" : f"key{i + 1}"
            }
        )

Kinesis Data Stream

# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")

#Data transmission
kinesis.put_records(
    Records = [
        {
            "Data" : b"String",
            "PartitionKey" : "String"
        }
    ],
    StreamName = "Kinesis Data Stream Name"
) 

Lambda

# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')

#Lambda run
response = lambda_client.invoke(
    FunctionName = 'Lambda to be processed',
    InvocationType = 'Event'|'RequestResponse'|'DryRun',
    LogType = 'None'|'Tail',
    Payload = b'bytes'|file
)


#Event Source Mappings Enable or Disable
import time
expected_status = "Enabled" #In case of invalidation, Disabled
progressing_status = "Enabling" #In case of invalidation, Disabling
Enabled = True #False for invalidation
response = lambda_client.list_event_source_mappings(
    EventSourceArn = "Kinesis or DynamoDB ARN to be processed",
    FunctionName = "Lambda function name to be processed"
)
if "EventSourceMappings" in response:
    for e in response["EventSourceMappings"]:
        if e["State"] != expected_status or e["State"] != progressing_status:
            response = lambda_client.update_event_source_mapping(
                UUID = e["UUID"],
                FunctionName = e["FunctionArn"],
                Enabled = Enabled,
                BatchSize = 100
            )
        if response["State"] != expected_status:
            while True:
                response = lambda_client.get_event_source_mapping(
                    UUID = e["UUID"]
                )
                if response["State"] == expected_status:
                    break
                time.sleep(10)

SageMaker

# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")

#Access sagemaker endpoint and receive forecast results
response = sagemaker.invoke_endpoint(
    EndpointName = "SageMaker endpoint name",
    Body=b'bytes'|file,
    ContentType = 'text/csv', #The MIME type of the input data in the request body.
    Accept = 'application/json' #The desired MIME type of the inference in the response.
)

SQS


# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "SQS queue URL. You can check it on the console."

#Get all messages from the SQS queue.
while True:
    sqs_message = sqs.receive_message(
        QueueUrl = QUEUE_URL,
        MaxNumberOfMessages = 10
    )
    if "Messages" in sqs_message:
        for message in sqs_message["Messages"]:
            try:
                print(message)
                #Delete the acquired message. Otherwise you may get duplicates
                sqs.delete_message(
                    QueueUrl = QUEUE_URL,
                    ReceiptHandle = message["ReceiptHandle"]
                )
            except Exception as e:
                print(f"type = {type(e)} , message = {e}")

S3


# -*- coding: utf-8 -*-
import boto3
s3 = boto3.client('s3')
BUCKET_NAME = "Bucket name to be processed"

#1 Object (file) writing
s3.put_object(
    Bucket = BUCKET_NAME,
    Body = "Data content. str type or bytes type",
    Key = "S3 key. s3:://Bucket name/Subsequent directories and file names"
)
#1 Read object (file)
s3.get_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key. s3:://Bucket name/Subsequent directories and file names"
)
#1 Delete object (file)
s3.delete_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key. s3:://Bucket name/Subsequent directories and file names"
)
#Copy one object (file) to another location
s3.copy_object(
    Bucket = BUCKET_NAME,
    Key = "Destination S3 key. s3:://Bucket name/Subsequent directories and file names",
    CopySource = {
        "Bucket" : "Source bucket name",
        "Key" : "Migration source S3 key"
    }
)


#Get all objects (files) below the specified prefix or in the bucket
BUCKET_NAME= "Bucket name to be processed"
contents = []
kwargs = {
    "Bucket" : BUCKET_NAME,
    "Prefix" : "Search target prefix"
} 
while True:
    response = s3.list_objects_v2(**kwargs)
    if "Contents" in response:
        contents.extend(response["Contents"])
        if 'NextContinuationToken' in response:
            kwargs["ContinuationToken"] = response['NextContinuationToken']
            continue
    break

Recommended Posts

Boto3 (manipulate AWS resources with Python library) API that is often used privately
Grammar summary that is often forgotten with matplotlib
LINE BOT with Python + AWS Lambda + API Gateway
Authentication information used by Boto3 (AWS SDK for Python)
String manipulation with python & pandas that I often use
Note that writing like this with ruby is writing like this with python
Manipulate S3 objects with Boto3 (high-level API and low-level API)
Python knowledge notes that can be used with AtCoder
AWS CDK with Python
Display only the resources created when acquiring AWS resources with Boto3
[AWS] Try adding Python library to Layer with SAM + Lambda (Python)
[Python] Make AWS resources mocked with Moto into pytest fixtures
Create API with Python, lambda, API Gateway quickly using AWS SAM