API Boto3 (manipuler les ressources AWS avec la bibliothèque Python) qui est souvent utilisée en privé

*** Voici quelques-unes des API répertoriées dans le "AWS SDK for Python" que j'utilise souvent, liées à boto3 (bibliothèque Python). *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

*** Veuillez lire le nom de la ressource et les différentes valeurs de réglage, le cas échéant ~ ***

Athena

# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")

class QueryFailed(Exception):
    """
Classe d'exception appelée lorsque l'exécution de la requête Athena échoue
    """
    pass

#Exécution de requêtes (traitement asynchrone)
start_query_response = athena.start_query_execution(
    QueryString = 'Requete',
    QueryExecutionContext={
        "Database": "Nom GlueDB utilisé pour interroger"
    },
    ResultConfiguration={
        "OutputLocation" : "s3://Nom du compartiment de sortie/Clé"
    }
)
query_execution_id = start_query_response["QueryExecutionId"]
#Vérifier l'état d'exécution de la requête
while True:
    query_status = athena.get_query_execution(
        QueryExecutionId = query_execution_id
    )
    query_execution_status = query_status['QueryExecution']['Status']['State']
    if query_execution_status == 'SUCCEEDED':
        break
    elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
        raise QueryFailed(f"query_execution_status = {query_execution_status}")
        time.sleep(10)
#Obtenir le résultat de l'exécution de la requête (uniquement en cas de succès)
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)

CloudWatchLogs


# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback

#Paramètres du journal
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])

#Définir le fuseau horaire sur l'heure du Japon (JST)
JST = timezone(timedelta(hours=9),"JST")

#Type de date lors de la sortie des journaux vers S3
DATE_FORMAT = "%Y-%m-%d"

def lambda_handler(event, context):
    """
Exportez l'équivalent d'une journée de CloudWatch Logs vers S3.
Le temps cible est le suivant.
    AM 00:00:00.000000 ~ PM 23:59:59.999999
    """
    try:
        #Hier PM23:59:59.999999
        tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
        #Hier AM00:00:00.000000
        tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
        #Utilisé comme préfixe lors de la sortie des journaux S3
        target_date = tmp_yesterday.strftime(DATE_FORMAT)
        #Convertir en type d'horodatage pour la sortie du journal (prendre jusqu'à microsecondes)
        today = int(tmp_today.timestamp() * 1000)
        yesterday = int(tmp_yesterday.timestamp() * 1000)


        #Obtenir CloudWatchLogGroup à partir de la variable d'environnement
        logGroups = os.environ["LOG_GROUPS"].split(",")
        for logGroupName in logGroups:
            try:
                keys = ["logGroupName","yesterday","today","target_date"]
                values = [logGroupName,yesterday,today,target_date]
                payload = dict(zip(keys,values))

                #Exécuter la sortie du journal
                response = logs.create_export_task(
                    logGroupName = payload["logGroupName"],
                    fromTime = payload["yesterday"],
                    to = payload["today"],
                    destination = BUCKET_NAME,
                    destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
                )

                #Attendez que la sortie du journal se termine.
                taskId = response["taskId"]
                while True:
                    response = logs.describe_export_tasks(
                        taskId = taskId
                    )
                    status = response["exportTasks"][0]["status"]["code"]
                    #Pause si l'exécution de la tâche est terminée
                    if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
                        logger.info(f"taskId {taskId} has finished exporting")
                        break
                    else:
                        logger.info(f"taskId {taskId} is now exporting")
                        time.sleep(WAITING_TIME)
                        continue

            except Exception as e:
                traceback.print_exc()
                logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)

    except Exception as e:
        traceback.print_exc()
        logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
        raise

DynamoDB

# -*- coding: utf-8 -*-
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')

#Obtenez 1 article
response = table.get_item(
    Key = {
        "id" : "1"
    }
)
#Enregistrer un article
response = table.put_item(
    Item = {
        "id" : "1",
        "key" : "value"
    }
)
#Mis à jour 1 article
response = table.update_item(
    Key = {
        "id" : "1"
    },
    UpdateExpression = "set key2 = :val",
    ExpressionAttributeValues = {
        ":val" : "value2"
    },
    ReturnValues = "UPDATED_NEW"
)
#Supprimer 1 élément
response = table.delete_item(
    Key = {
        "id" : "1"
    }
)


#Supprimer tous les éléments (tous les enregistrements) tronquer ou supprimer de la table
#Obtenez toutes les données
delete_items = []
parameters   = {}
while True:
    response = table.scan(**parameters)
    delete_items.extend(response["Items"])
    if "LastEvaluatedKey" in response:
        parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
    else:
        break
#Extraction de clé
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#Suprimmer les données
with table.batch_writer() as batch:
    for key in delete_keys:
        batch.delete_item(Key = key)



#Supprimer la table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
    for table_name in response['TableNames']:
        if table_name == "Nom de la table à supprimer":
            dynamodb_client.delete_table(TableName = table_name)
            waiter = dynamodb_client.get_waiter("table_not_exists")
            waiter.wait(TableName = table_name)
            #Si vous avez le type de suivi de cible Auto Scaling, vous pouvez supprimer CloudWatch Alarm en même temps en supprimant ScalingPolicy.
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}ReadCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:ReadCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}WriteCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:WriteCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")



#Créer une table
table_name = "table"
dynamodb.create_table(
    TableName = table_name,
    KeySchema = [{
        "AttributeName" : "id",
        "KeyType" : "HASH"
    }],
    AttributeDefinitions = [{
        "AttributeName" : "id",
        "AttributeType" : "S"
    }],
    ProvisionedThroughput = {
        "ReadCapacityUnits" : 1,
        "WriteCapacityUnits" : 1
    }
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
#Paramètres de mise à l'échelle automatique du type de suivi de la cible
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "ARN de rôle IAM pour Auto Scaling"
)
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:WriteCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "ARN de rôle IAM pour Auto Scaling"
)
#Définir la politique de mise à l'échelle
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}ReadCapacity",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBReadCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}WriteCapacity",
    ScalableDimension='dynamodb:table:WriteCapacityUnits',
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)



#Mise à jour du schéma de table
response = dynamodb_client.update_table(
    AttributeDefinitions = [
        {
            'AttributeName': 'string',
            'AttributeType': 'S'|'N'|'B'
        },
    ],
    TableName = 'string',
    BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
    ProvisionedThroughput = {
        'ReadCapacityUnits': 123,
        'WriteCapacityUnits': 123
    }
)


#Le traitement par lots
table = dynamodb.Table("table")
with table.batch_writer() as batch:
    for i in range(10 ** 6):
        batch.put_item(
            Item = {
                "id" : str(i + 1),
                "key" : f"key{i + 1}"
            }
        )

Kinesis Data Stream

# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")

#Transmission de données
kinesis.put_records(
    Records = [
        {
            "Data" : b"String",
            "PartitionKey" : "String"
        }
    ],
    StreamName = "Kinesis Data Stream Name"
) 

Lambda

# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')

#Exécution Lambda
response = lambda_client.invoke(
    FunctionName = 'Lambda à traiter',
    InvocationType = 'Event'|'RequestResponse'|'DryRun',
    LogType = 'None'|'Tail',
    Payload = b'bytes'|file
)


#Activer ou désactiver les mappages de source d'événements
import time
expected_status = "Enabled" #En cas d'invalidation, désactivé
progressing_status = "Enabling" #En cas d'invalidation, désactivation
Enabled = True #Faux pour invalidation
response = lambda_client.list_event_source_mappings(
    EventSourceArn = "Kinesis ou DynamoDB ARN à traiter",
    FunctionName = "Nom de la fonction Lambda à traiter"
)
if "EventSourceMappings" in response:
    for e in response["EventSourceMappings"]:
        if e["State"] != expected_status or e["State"] != progressing_status:
            response = lambda_client.update_event_source_mapping(
                UUID = e["UUID"],
                FunctionName = e["FunctionArn"],
                Enabled = Enabled,
                BatchSize = 100
            )
        if response["State"] != expected_status:
            while True:
                response = lambda_client.get_event_source_mapping(
                    UUID = e["UUID"]
                )
                if response["State"] == expected_status:
                    break
                time.sleep(10)

SageMaker

# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")

#Accédez au point de terminaison sagemaker et recevez les résultats de prédiction
response = sagemaker.invoke_endpoint(
    EndpointName = "Nom du point de terminaison SageMaker",
    Body=b'bytes'|file,
    ContentType = 'text/csv', #The MIME type of the input data in the request body.
    Accept = 'application/json' #The desired MIME type of the inference in the response.
)

SQS


# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "URL de la file d'attente SQS. Vous pouvez le vérifier sur la console."

#Récupérez tous les messages de la file d'attente SQS.
while True:
    sqs_message = sqs.receive_message(
        QueueUrl = QUEUE_URL,
        MaxNumberOfMessages = 10
    )
    if "Messages" in sqs_message:
        for message in sqs_message["Messages"]:
            try:
                print(message)
                #Supprimez le message acquis. Sinon, vous risquez d'obtenir des doublons
                sqs.delete_message(
                    QueueUrl = QUEUE_URL,
                    ReceiptHandle = message["ReceiptHandle"]
                )
            except Exception as e:
                print(f"type = {type(e)} , message = {e}")

S3


# -*- coding: utf-8 -*-
import boto3
s3 = boto3.client('s3')
BUCKET_NAME = "Nom du bucket à traiter"

#1 Écriture d'objet (fichier)
s3.put_object(
    Bucket = BUCKET_NAME,
    Body = "Contenu des données. type str ou type d'octets",
    Key = "Clé S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants"
)
#1 Lire l'objet (fichier)
s3.get_object(
    Bucket = BUCKET_NAME,
    Key = "Clé S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants"
)
#1 Supprimer l'objet (fichier)
s3.delete_object(
    Bucket = BUCKET_NAME,
    Key = "Clé S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants"
)
#Copier un objet (fichier) dans un autre emplacement
s3.copy_object(
    Bucket = BUCKET_NAME,
    Key = "Clé de destination S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants",
    CopySource = {
        "Bucket" : "Nom du compartiment source",
        "Key" : "Clé S3 de la source de migration"
    }
)


#Obtenir tous les objets (fichiers) sous le préfixe spécifié ou dans le compartiment
BUCKET_NAME= "Nom du bucket à traiter"
contents = []
kwargs = {
    "Bucket" : BUCKET_NAME,
    "Prefix" : "Préfixe de la cible de recherche"
} 
while True:
    response = s3.list_objects_v2(**kwargs)
    if "Contents" in response:
        contents.extend(response["Contents"])
        if 'NextContinuationToken' in response:
            kwargs["ContinuationToken"] = response['NextContinuationToken']
            continue
    break

Recommended Posts

API Boto3 (manipuler les ressources AWS avec la bibliothèque Python) qui est souvent utilisée en privé
Résumé de la grammaire souvent oubliée avec matplotlib
LINE BOT avec Python + AWS Lambda + API Gateway
Informations d'identification utilisées par Boto3 (AWS SDK for Python)
Manipulation de chaînes avec python et pandas que j'utilise souvent
Notez qu'écrire comme ça avec ruby c'est écrire comme ça avec python
Manipuler des objets S3 avec Boto3 (API de haut niveau et API de bas niveau)
Notes sur les connaissances Python utilisables avec AtCoder
Afficher uniquement les ressources créées lors de l'acquisition de ressources AWS avec Boto3
[AWS] Essayez d'ajouter la bibliothèque Python à la couche avec SAM + Lambda (Python)
[Python] Transformez les ressources AWS simulées avec Moto en appareils Pytest
Créez rapidement une API avec Python, lambda et API Gateway à l'aide d'AWS SAM