Exemple d'API Boto3 (Python) que j'utilise souvent

*** Vous trouverez ci-dessous quelques-unes des API répertoriées dans le "AWS SDK for Python" que j'utilise souvent, liées à boto3 (bibliothèque Python). *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

*** Veuillez lire le nom de la ressource et les différentes valeurs de réglage selon le cas ~ ***

Athena

# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")

class QueryFailed(Exception):
    """
Classe d'exception appelée lorsque l'exécution de la requête Athena échoue
    """
    pass

#do query as a async process
start_query_response = athena.start_query_execution(
    QueryString = 'Requete',
    QueryExecutionContext={
        "Database": "Your GlueDB name"
    },
    ResultConfiguration={
        "OutputLocation" : "s3://xxx/yyy"
    }
)
query_execution_id = start_query_response["QueryExecutionId"]
#Check the query status
while True:
    query_status = athena.get_query_execution(
        QueryExecutionId = query_execution_id
    )
    query_execution_status = query_status['QueryExecution']['Status']['State']
    if query_execution_status == 'SUCCEEDED':
        break
    elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
        raise QueryFailed(f"query_execution_status = {query_execution_status}")
    else:
        time.sleep(10)
#Get query result. Just only for your query successed
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)

CloudWatchLogs

Exemple CloudWatch Logs

# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
from logging import getLogger, StreamHandler, DEBUG, INFO, WARNING, ERROR, CRITICAL
import traceback

#logger setting
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(os.getenv("LogLevel", DEBUG))
logger.addHandler(handler)
logger.propagate = False

logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])

#define timezone as JST
JST = timezone(timedelta(hours=9),"JST")

#DateFormat when you log out into S3
DATE_FORMAT = "%Y-%m-%d"

def lambda_handler(event, context):
    """
Exportez l'équivalent d'une journée de CloudWatch Logs vers S3.
Le temps cible est le suivant.
    AM 00:00:00.000000 ~ PM 23:59:59.999999
    """
    try:
        #Hier PM23:59:59.999999
        tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
        #Hier AM00:00:00.000000
        tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
        #Utilisé comme préfixe lors de la sortie des journaux S3
        target_date = tmp_yesterday.strftime(DATE_FORMAT)
        #Convertir en type d'horodatage pour la sortie du journal (prendre jusqu'à microsecondes)
        today = int(tmp_today.timestamp() * 1000)
        yesterday = int(tmp_yesterday.timestamp() * 1000)


        #Get the CloudWatchLogGroups from your environment variable
        logGroups = os.environ["LOG_GROUPS"].split(",")
        for logGroupName in logGroups:
            try:
                keys = ["logGroupName","yesterday","today","target_date"]
                values = [logGroupName,yesterday,today,target_date]
                payload = dict(zip(keys,values))

                #Output CloudWatchLogs into S3 (Async Process)
                response = logs.create_export_task(
                    logGroupName = payload["logGroupName"],
                    fromTime = payload["yesterday"],
                    to = payload["today"],
                    destination = BUCKET_NAME,
                    destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
                )
                
                taskId = response["taskId"]
                while True:
                    response = logs.describe_export_tasks(
                        taskId = taskId
                    )
                    status = response["exportTasks"][0]["status"]["code"]
                    if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
                        logger.info(f"taskId {taskId} has finished exporting")
                        break
                    else:
                        logger.info(f"taskId {taskId} is now exporting")
                        time.sleep(WAITING_TIME)
                        continue

            except Exception as e:
                traceback.print_exc()
                logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)

    except Exception as e:
        traceback.print_exc()
        logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
        raise

DynamoDB <détails>

Exemple DynamoDB </ summary>

# -*- coding: utf-8 -*-
import boto3
from boto3.dynamodb.conditions import Key,Attr

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')

"""
DML  select update insert delete
"""
#get a record
response = table.get_item(
    Key = {
        "id" : "1"
    }
)
#write a record
response = table.put_item(
    Item = {
        "id" : "1",
        "key" : "value"
    }
)
#update a record
response = table.update_item(
    Key = {
        "id" : "1"
    },
    UpdateExpression = "set #age = :val",
    ExpressionAttributeNames = {
        "#age" : "age"
    },
    ExpressionAttributeValues = {
        ":val" : 95
    },
    ReturnValues = "UPDATED_NEW"
)
#delete a record
response = table.delete_item(
    Key = {
        "id" : "1"
    }
)



#write records as a batch process
with table.batch_writer() as batch:
    for i in range(10 ** 6):
        batch.put_item(
            Item = {
                "id" : str(i + 1),
                "key" : f"key{i + 1}"
            }
        )


# truncate  or delete from table
delete_items = []
parameters   = {}
while True:
    response = table.scan(**parameters)
    delete_items.extend(response["Items"])
    if "LastEvaluatedKey" in response:
        parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
    else:
        break
# get the hash key and range key
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#delete datas as batch process
with table.batch_writer() as batch:
    for key in delete_keys:
        batch.delete_item(Key = key)



"""
DDL  Table Process
"""
#Delete a table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
    for table_name in response['TableNames']:
        if table_name == "Nom de la table à supprimer":
            dynamodb_client.delete_table(TableName = table_name)
            waiter = dynamodb_client.get_waiter("table_not_exists")
            waiter.wait(TableName = table_name)
            #Si vous avez le type de suivi de cible Auto Scaling, vous pouvez supprimer CloudWatch Alarm en même temps en supprimant ScalingPolicy.
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}ReadCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:ReadCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}WriteCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:WriteCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")



#Create a table
table_name = "table"
dynamodb.create_table(
    TableName = table_name,
    KeySchema = [{
        "AttributeName" : "id",
        "KeyType" : "HASH"
    }],
    AttributeDefinitions = [{
        "AttributeName" : "id",
        "AttributeType" : "S"
    }],
    ProvisionedThroughput = {
        "ReadCapacityUnits" : 1,
        "WriteCapacityUnits" : 1
    }
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
# AutoScaling setting of Target Tracking
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "ARN de rôle IAM pour Auto Scaling"
)
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:WriteCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "ARN de rôle IAM pour Auto Scaling"
)
# set up the autoscaling policy
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}ReadCapacity",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBReadCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}WriteCapacity",
    ScalableDimension='dynamodb:table:WriteCapacityUnits',
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)



#update the table scheme
response = dynamodb_client.update_table(
    AttributeDefinitions = [
        {
            'AttributeName': 'string',
            'AttributeType': 'S'|'N'|'B'
        },
    ],
    TableName = 'string',
    BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
    ProvisionedThroughput = {
        'ReadCapacityUnits': 123,
        'WriteCapacityUnits': 123
    }
)

EMR

# -*- coding: utf-8 -*-
import boto3

#EMR
emr = boto3.client("emr")

#show cluster list
cluster_list = emr.list_clusters(
    ClusterStates = ["STARTING","BOOTSTRAPPING","RUNNING","WAITING"]
)

#Set up Cluster
params = {
    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
}
response = emr.run_job_flow(**params)

#describe cluster
response = emr.describe_cluster(
    ClusterId = "The identifier of the cluster to describe."
)


#Add steps
step = {
    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.add_job_flow_steps
}
response = emr.add_job_flow_steps(
    JobFlowId = "A string that uniquely identifies the job flow. This identifier is returned by RunJobFlow and can also be obtained from ListClusters .",
    Steps = [step]
)

Kinesis Data Stream

# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")

#Transmission de données
kinesis.put_records(
    Records = [
        {
            "Data" : b"String",
            "PartitionKey" : "String"
        }
    ],
    StreamName = "Kinesis Data Stream Name"
) 

Lambda

# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')

#Invoke Lambda
response = lambda_client.invoke(
    FunctionName = 'Lambda Function Name',
    InvocationType = 'Event'|'RequestResponse'|'DryRun',
    LogType = 'None'|'Tail',
    Payload = b'bytes'|file
)


#Event Source Mappings enable or disable
import time
expected_status = "Enabled" #when disable, Disabled
progressing_status = "Enabling" #when disable, Disabling
Enabled = True #when disable, False
response = lambda_client.list_event_source_mappings(
    EventSourceArn = "Kinesis or DynamoDB ARN",
    FunctionName = "Lambda Function name"
)
if "EventSourceMappings" in response:
    for e in response["EventSourceMappings"]:
        if e["State"] != expected_status or e["State"] != progressing_status:
            response = lambda_client.update_event_source_mapping(
                UUID = e["UUID"],
                FunctionName = e["FunctionArn"],
                Enabled = Enabled,
                BatchSize = 100
            )
        if response["State"] != expected_status:
            while True:
                response = lambda_client.get_event_source_mapping(
                    UUID = e["UUID"]
                )
                if response["State"] == expected_status:
                    break
                time.sleep(10)

SageMaker

# -*- coding: utf-8 -*-
import boto3

sagemaker = boto3.client("sagemaker-runtime")

#Invoke Endpoint and get a predicted result
response = sagemaker.invoke_endpoint(
    EndpointName = "SageMaker Endpoint Name",
    Body=b'bytes'|file,
    ContentType = 'text/csv', #The MIME type of the input data in the request body.
    Accept = 'application/json' #The desired MIME type of the inference in the response.
)

SQS

# -*- coding: utf-8 -*-
import boto3

sqs = boto3.client('sqs')
QUEUE_URL= "SQS Queue URL"

#Get all messages from Queue of SQS
while True:
    sqs_message = sqs.receive_message(
        QueueUrl = QUEUE_URL,
        MaxNumberOfMessages = 10
    )
    if "Messages" in sqs_message:
        for message in sqs_message["Messages"]:
            try:
                print(message)
                #When you get a message, delete it. Otherwise, You get a duplicate message
                sqs.delete_message(
                    QueueUrl = QUEUE_URL,
                    ReceiptHandle = message["ReceiptHandle"]
                )
            except Exception as e:
                print(f"type = {type(e)} , message = {e}")
    else:
        break

SSM

# -*- coding: utf-8 -*-
import boto3

ssm = boto3.client('ssm')
parameters = "your SSM parameter"
response = ssm.get_parameters(
    Names=[
        parameters,
    ],
    #Return decrypted values for secure string parameters. This flag is ignored for String and StringList parameter types.
    WithDecryption = True
)
print(response['Parameters'][0]['Value'])

S3 <détails>

Exemple S3 </ summary>

# -*- coding: utf-8 -*-
import boto3

#kms
kms = boto3.client("kms")

#s3
s3 = boto3.client('s3')

BUCKET_NAME = "Bucket name"

"""
One object Process
"""
#Write object
s3.put_object(
    Bucket = BUCKET_NAME,
    Body = "test body".encode("UTF-8"),
    Key = "S3 key  s3://Bucket Name/Répertoires et noms de fichiers suivants"
)

#Read object
s3.get_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key  s3://Bucket Name/Répertoires et noms de fichiers suivants"
)["Body"].read().decode("UTF-8")

#Delete object
s3.delete_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key  s3://Bucket Name/Répertoires et noms de fichiers suivants"
)

#copy object
s3.copy_object(
    Bucket = BUCKET_NAME,
    Key = "Clé de destination S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants",
    CopySource = {
        "Bucket" : "Nom du compartiment source",
        "Key" : "Clé S3 de la source de migration"
    }
)

"""
Multiple Objects Process
"""
#Get all objects under the specified s3 prefix
contents = []
kwargs = {
    "Bucket" : BUCKET_NAME,
    "Prefix" : "Préfixe de la cible de recherche"
} 
while True:
    response = s3.list_objects_v2(**kwargs)
    if "Contents" in response:
        contents.extend(response["Contents"])
        if 'NextContinuationToken' in response:
            kwargs["ContinuationToken"] = response['NextContinuationToken']
            continue
    break


"""
Server Side Encryption
1. Default Encryption
  1-1. SSE with AES-256
  1-2. SSE with KMS AWS Managed Keys
  1-3. SSE with KMS CMK(Customer Managed Keys)
2. Non Default Encryption
  2-1. SSE with AES-256
  2-2. SSE with KMS AWS Managed Keys
  2-3. SSE with KMS CMK(Customer Managed Keys)
  2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
"""
#1-1. SSE with AES-256
#1-2. SSE with KMS AWS Managed Keys
#1-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8")
)
print(f'ServerSideEncryption'.ljust(20) + f' = {response["ServerSideEncryption"]}')
#just only for KMS. check the KeyManager
if response["ServerSideEncryption"] == "aws:kms":
    KeyManager = kms.describe_key(
        KeyId = response["SSEKMSKeyId"]
    )["KeyMetadata"]["KeyManager"]
    print(f"KeyManager".ljust(20) + f" = {KeyManager}")


#2-1. SSE with AES-256
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "AES256"
)
#2-2. SSE with KMS AWS Managed Keys
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "aws:kms"
)
#2-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "aws:kms",
    SSEKMSKeyId = "Your Customer Manged Key ID"
)
#2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    SSECustomerAlgorithm = "AES256",
    SSECustomerKey = "The Key You generated. ex) SSE_CUSTOMER_KEY=$(cat /dev/urandom | base64 -i | fold -w 32 | head -n 1)"
)

Recommended Posts