My favorite boto3 (Python) API sample

*** Below are some of the APIs listed in the "AWS SDK for Python" that are related to boto3 (Python library) that I often use. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

*** Please read the resource name and various setting values as appropriate ~ ***

Athena

# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")

class QueryFailed(Exception):
    """
Exception class called when Athena query execution fails
    """
    pass

#do query as a async process
start_query_response = athena.start_query_execution(
    QueryString = 'Query',
    QueryExecutionContext={
        "Database": "Your GlueDB name"
    },
    ResultConfiguration={
        "OutputLocation" : "s3://xxx/yyy"
    }
)
query_execution_id = start_query_response["QueryExecutionId"]
#Check the query status
while True:
    query_status = athena.get_query_execution(
        QueryExecutionId = query_execution_id
    )
    query_execution_status = query_status['QueryExecution']['Status']['State']
    if query_execution_status == 'SUCCEEDED':
        break
    elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
        raise QueryFailed(f"query_execution_status = {query_execution_status}")
    else:
        time.sleep(10)
#Get query result. Just only for your query successed
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)

CloudWatchLogs

CloudWatch Logs sample

# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
from logging import getLogger, StreamHandler, DEBUG, INFO, WARNING, ERROR, CRITICAL
import traceback

#logger setting
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(os.getenv("LogLevel", DEBUG))
logger.addHandler(handler)
logger.propagate = False

logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])

#define timezone as JST
JST = timezone(timedelta(hours=9),"JST")

#DateFormat when you log out into S3
DATE_FORMAT = "%Y-%m-%d"

def lambda_handler(event, context):
    """
Output one day's worth of CloudWatch Logs to S3.
The target time is as follows.
    AM 00:00:00.000000 ~ PM 23:59:59.999999
    """
    try:
        #Yesterday PM23:59:59.999999
        tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
        #Yesterday AM00:00:00.000000
        tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
        #Used as a prefix when outputting S3 logs
        target_date = tmp_yesterday.strftime(DATE_FORMAT)
        #Convert to time stamp type for log output (take up to microseconds)
        today = int(tmp_today.timestamp() * 1000)
        yesterday = int(tmp_yesterday.timestamp() * 1000)


        #Get the CloudWatchLogGroups from your environment variable
        logGroups = os.environ["LOG_GROUPS"].split(",")
        for logGroupName in logGroups:
            try:
                keys = ["logGroupName","yesterday","today","target_date"]
                values = [logGroupName,yesterday,today,target_date]
                payload = dict(zip(keys,values))

                #Output CloudWatchLogs into S3 (Async Process)
                response = logs.create_export_task(
                    logGroupName = payload["logGroupName"],
                    fromTime = payload["yesterday"],
                    to = payload["today"],
                    destination = BUCKET_NAME,
                    destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
                )
                
                taskId = response["taskId"]
                while True:
                    response = logs.describe_export_tasks(
                        taskId = taskId
                    )
                    status = response["exportTasks"][0]["status"]["code"]
                    if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
                        logger.info(f"taskId {taskId} has finished exporting")
                        break
                    else:
                        logger.info(f"taskId {taskId} is now exporting")
                        time.sleep(WAITING_TIME)
                        continue

            except Exception as e:
                traceback.print_exc()
                logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)

    except Exception as e:
        traceback.print_exc()
        logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
        raise

DynamoDB

DynamoDB sample
# -*- coding: utf-8 -*-
import boto3
from boto3.dynamodb.conditions import Key,Attr

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')

"""
DML  select update insert delete
"""
#get a record
response = table.get_item(
    Key = {
        "id" : "1"
    }
)
#write a record
response = table.put_item(
    Item = {
        "id" : "1",
        "key" : "value"
    }
)
#update a record
response = table.update_item(
    Key = {
        "id" : "1"
    },
    UpdateExpression = "set #age = :val",
    ExpressionAttributeNames = {
        "#age" : "age"
    },
    ExpressionAttributeValues = {
        ":val" : 95
    },
    ReturnValues = "UPDATED_NEW"
)
#delete a record
response = table.delete_item(
    Key = {
        "id" : "1"
    }
)



#write records as a batch process
with table.batch_writer() as batch:
    for i in range(10 ** 6):
        batch.put_item(
            Item = {
                "id" : str(i + 1),
                "key" : f"key{i + 1}"
            }
        )


# truncate  or delete from table
delete_items = []
parameters   = {}
while True:
    response = table.scan(**parameters)
    delete_items.extend(response["Items"])
    if "LastEvaluatedKey" in response:
        parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
    else:
        break
# get the hash key and range key
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#delete datas as batch process
with table.batch_writer() as batch:
    for key in delete_keys:
        batch.delete_item(Key = key)



"""
DDL  Table Process
"""
#Delete a table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
    for table_name in response['TableNames']:
        if table_name == "Table name to be deleted":
            dynamodb_client.delete_table(TableName = table_name)
            waiter = dynamodb_client.get_waiter("table_not_exists")
            waiter.wait(TableName = table_name)
            #If you have Target Tracking type Auto Scaling, you can delete CloudWatch Alarm at the same time by deleting ScalingPolicy.
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}ReadCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:ReadCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}WriteCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:WriteCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")



#Create a table
table_name = "table"
dynamodb.create_table(
    TableName = table_name,
    KeySchema = [{
        "AttributeName" : "id",
        "KeyType" : "HASH"
    }],
    AttributeDefinitions = [{
        "AttributeName" : "id",
        "AttributeType" : "S"
    }],
    ProvisionedThroughput = {
        "ReadCapacityUnits" : 1,
        "WriteCapacityUnits" : 1
    }
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
# AutoScaling setting of Target Tracking
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM Role ARN for Auto Scaling"
)
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:WriteCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM Role ARN for Auto Scaling"
)
# set up the autoscaling policy
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}ReadCapacity",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBReadCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}WriteCapacity",
    ScalableDimension='dynamodb:table:WriteCapacityUnits',
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)



#update the table scheme
response = dynamodb_client.update_table(
    AttributeDefinitions = [
        {
            'AttributeName': 'string',
            'AttributeType': 'S'|'N'|'B'
        },
    ],
    TableName = 'string',
    BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
    ProvisionedThroughput = {
        'ReadCapacityUnits': 123,
        'WriteCapacityUnits': 123
    }
)

EMR

# -*- coding: utf-8 -*-
import boto3

#EMR
emr = boto3.client("emr")

#show cluster list
cluster_list = emr.list_clusters(
    ClusterStates = ["STARTING","BOOTSTRAPPING","RUNNING","WAITING"]
)

#Set up Cluster
params = {
    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
}
response = emr.run_job_flow(**params)

#describe cluster
response = emr.describe_cluster(
    ClusterId = "The identifier of the cluster to describe."
)


#Add steps
step = {
    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.add_job_flow_steps
}
response = emr.add_job_flow_steps(
    JobFlowId = "A string that uniquely identifies the job flow. This identifier is returned by RunJobFlow and can also be obtained from ListClusters .",
    Steps = [step]
)

Kinesis Data Stream

# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")

#Data transmission
kinesis.put_records(
    Records = [
        {
            "Data" : b"String",
            "PartitionKey" : "String"
        }
    ],
    StreamName = "Kinesis Data Stream Name"
) 

Lambda

# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')

#Invoke Lambda
response = lambda_client.invoke(
    FunctionName = 'Lambda Function Name',
    InvocationType = 'Event'|'RequestResponse'|'DryRun',
    LogType = 'None'|'Tail',
    Payload = b'bytes'|file
)


#Event Source Mappings enable or disable
import time
expected_status = "Enabled" #when disable, Disabled
progressing_status = "Enabling" #when disable, Disabling
Enabled = True #when disable, False
response = lambda_client.list_event_source_mappings(
    EventSourceArn = "Kinesis or DynamoDB ARN",
    FunctionName = "Lambda Function name"
)
if "EventSourceMappings" in response:
    for e in response["EventSourceMappings"]:
        if e["State"] != expected_status or e["State"] != progressing_status:
            response = lambda_client.update_event_source_mapping(
                UUID = e["UUID"],
                FunctionName = e["FunctionArn"],
                Enabled = Enabled,
                BatchSize = 100
            )
        if response["State"] != expected_status:
            while True:
                response = lambda_client.get_event_source_mapping(
                    UUID = e["UUID"]
                )
                if response["State"] == expected_status:
                    break
                time.sleep(10)

SageMaker

# -*- coding: utf-8 -*-
import boto3

sagemaker = boto3.client("sagemaker-runtime")

#Invoke Endpoint and get a predicted result
response = sagemaker.invoke_endpoint(
    EndpointName = "SageMaker Endpoint Name",
    Body=b'bytes'|file,
    ContentType = 'text/csv', #The MIME type of the input data in the request body.
    Accept = 'application/json' #The desired MIME type of the inference in the response.
)

SQS

# -*- coding: utf-8 -*-
import boto3

sqs = boto3.client('sqs')
QUEUE_URL= "SQS Queue URL"

#Get all messages from Queue of SQS
while True:
    sqs_message = sqs.receive_message(
        QueueUrl = QUEUE_URL,
        MaxNumberOfMessages = 10
    )
    if "Messages" in sqs_message:
        for message in sqs_message["Messages"]:
            try:
                print(message)
                #When you get a message, delete it. Otherwise, You get a duplicate message
                sqs.delete_message(
                    QueueUrl = QUEUE_URL,
                    ReceiptHandle = message["ReceiptHandle"]
                )
            except Exception as e:
                print(f"type = {type(e)} , message = {e}")
    else:
        break

SSM

# -*- coding: utf-8 -*-
import boto3

ssm = boto3.client('ssm')
parameters = "your SSM parameter"
response = ssm.get_parameters(
    Names=[
        parameters,
    ],
    #Return decrypted values for secure string parameters. This flag is ignored for String and StringList parameter types.
    WithDecryption = True
)
print(response['Parameters'][0]['Value'])

S3

S3 sample
# -*- coding: utf-8 -*-
import boto3

#kms
kms = boto3.client("kms")

#s3
s3 = boto3.client('s3')

BUCKET_NAME = "Bucket name"

"""
One object Process
"""
#Write object
s3.put_object(
    Bucket = BUCKET_NAME,
    Body = "test body".encode("UTF-8"),
    Key = "S3 key  s3://Bucket Name/Subsequent directories and file names"
)

#Read object
s3.get_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key  s3://Bucket Name/Subsequent directories and file names"
)["Body"].read().decode("UTF-8")

#Delete object
s3.delete_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key  s3://Bucket Name/Subsequent directories and file names"
)

#copy object
s3.copy_object(
    Bucket = BUCKET_NAME,
    Key = "Destination S3 key. s3:://Bucket name/Subsequent directories and file names",
    CopySource = {
        "Bucket" : "Source bucket name",
        "Key" : "Migration source S3 key"
    }
)

"""
Multiple Objects Process
"""
#Get all objects under the specified s3 prefix
contents = []
kwargs = {
    "Bucket" : BUCKET_NAME,
    "Prefix" : "Search target prefix"
} 
while True:
    response = s3.list_objects_v2(**kwargs)
    if "Contents" in response:
        contents.extend(response["Contents"])
        if 'NextContinuationToken' in response:
            kwargs["ContinuationToken"] = response['NextContinuationToken']
            continue
    break


"""
Server Side Encryption
1. Default Encryption
  1-1. SSE with AES-256
  1-2. SSE with KMS AWS Managed Keys
  1-3. SSE with KMS CMK(Customer Managed Keys)
2. Non Default Encryption
  2-1. SSE with AES-256
  2-2. SSE with KMS AWS Managed Keys
  2-3. SSE with KMS CMK(Customer Managed Keys)
  2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
"""
#1-1. SSE with AES-256
#1-2. SSE with KMS AWS Managed Keys
#1-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8")
)
print(f'ServerSideEncryption'.ljust(20) + f' = {response["ServerSideEncryption"]}')
#just only for KMS. check the KeyManager
if response["ServerSideEncryption"] == "aws:kms":
    KeyManager = kms.describe_key(
        KeyId = response["SSEKMSKeyId"]
    )["KeyMetadata"]["KeyManager"]
    print(f"KeyManager".ljust(20) + f" = {KeyManager}")


#2-1. SSE with AES-256
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "AES256"
)
#2-2. SSE with KMS AWS Managed Keys
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "aws:kms"
)
#2-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "aws:kms",
    SSEKMSKeyId = "Your Customer Manged Key ID"
)
#2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    SSECustomerAlgorithm = "AES256",
    SSECustomerKey = "The Key You generated. ex) SSE_CUSTOMER_KEY=$(cat /dev/urandom | base64 -i | fold -w 32 | head -n 1)"
)

Recommended Posts

My favorite boto3 (Python) API sample
Google Cloud Vision API sample for python
PHP / Python / Ruby sample hitting Path API
My Numpy (Python)
My sys (python)
My pyproj (python)
My pandas (python)
My str (python)
My pyautogui (python)
Python closure sample
My PySide (Python)
My shutil (python)
My matplotlib (python)
My urllib (python)
My pyperclip (python)
My sklearn (python)
[My memo] python
My ConfigParser (Python)
My Webdriver (Python)
My arcpy (python)
My win32gui (Python)
My os (python)
Ajax + Python + PostgreSQL sample
Evernote API in Python
[My memo] python -v / python -V
Python Tips (my memo)
[Python] Get insight data using Google My Business API
OpenCV3 Python API list
C API in Python 3
TensorFlow API memo (Python)
Python --Simple multi-thread sample
Sample image of Python API server for EC2 (public AMI)
Hit Mastodon's API in Python
Sample data created with python
My python data analysis container
Use Trello API with python
AWS CDK-Lambda + API Gateway (Python)
Use Twitter API with Python
S3 operation with python boto3
My python data analytics environment
Web API with Python + Falcon
Python higher-order function (decorator) sample
Play RocketChat with API / Python
Blender Python API in Houdini (Python 3)
Sample usage of Python pickle
[Python] Sample code for Python grammar
Call the API with python3.
Use subsonic API with python3
Google Drive Api Tips (Python)
Use e-Stat API from Python
Sample to use after OAuth authentication of BOX API with Python
I tried to analyze my favorite singer (SHISHAMO) using Spotify API
[SEO] Flow / sample code when using Google Analytics API in Python