Boto3-API (AWS-Ressourcen mit Python-Bibliothek bearbeiten), die häufig privat verwendet wird

*** Im Folgenden sind einige der APIs aufgeführt, die im "AWS SDK für Python" aufgeführt sind und die ich häufig verwende und die sich auf boto3 (Python-Bibliothek) beziehen. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

*** Bitte lesen Sie den Ressourcennamen und die entsprechenden Einstellungswerte entsprechend ~ ***

Athena

# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")

class QueryFailed(Exception):
    """
Ausnahmeklasse, die aufgerufen wird, wenn die Ausführung der Athena-Abfrage fehlschlägt
    """
    pass

#Abfrageausführung (asynchrone Verarbeitung)
start_query_response = athena.start_query_execution(
    QueryString = 'Abfrage',
    QueryExecutionContext={
        "Database": "GlueDB-Name, der zum Abfragen verwendet wird"
    },
    ResultConfiguration={
        "OutputLocation" : "s3://Name des Ausgabe-Buckets/Schlüssel"
    }
)
query_execution_id = start_query_response["QueryExecutionId"]
#Überprüfen Sie den Ausführungsstatus der Abfrage
while True:
    query_status = athena.get_query_execution(
        QueryExecutionId = query_execution_id
    )
    query_execution_status = query_status['QueryExecution']['Status']['State']
    if query_execution_status == 'SUCCEEDED':
        break
    elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
        raise QueryFailed(f"query_execution_status = {query_execution_status}")
        time.sleep(10)
#Ergebnis der Abfrageausführung abrufen (nur bei Erfolg)
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)

CloudWatchLogs


# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback

#Protokolleinstellungen
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])

#Stellen Sie die Zeitzone auf Japanische Zeit (JST) ein.
JST = timezone(timedelta(hours=9),"JST")

#Datumstyp bei der Ausgabe von Protokollen an S3
DATE_FORMAT = "%Y-%m-%d"

def lambda_handler(event, context):
    """
Ausgabe von CloudWatch-Protokollen im Wert von einem Tag an S3.
Die Zielzeit ist wie folgt.
    AM 00:00:00.000000 ~ PM 23:59:59.999999
    """
    try:
        #Gestern PM23:59:59.999999
        tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
        #Gestern AM00:00:00.000000
        tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
        #Wird als Präfix bei der Ausgabe von S3-Protokollen verwendet
        target_date = tmp_yesterday.strftime(DATE_FORMAT)
        #Für die Protokollausgabe in Zeitstempeltyp konvertieren (bis zu Mikrosekunden dauern)
        today = int(tmp_today.timestamp() * 1000)
        yesterday = int(tmp_yesterday.timestamp() * 1000)


        #Holen Sie sich CloudWatchLogGroup von der Umgebungsvariablen
        logGroups = os.environ["LOG_GROUPS"].split(",")
        for logGroupName in logGroups:
            try:
                keys = ["logGroupName","yesterday","today","target_date"]
                values = [logGroupName,yesterday,today,target_date]
                payload = dict(zip(keys,values))

                #Protokollausgabe ausführen
                response = logs.create_export_task(
                    logGroupName = payload["logGroupName"],
                    fromTime = payload["yesterday"],
                    to = payload["today"],
                    destination = BUCKET_NAME,
                    destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
                )

                #Warten Sie, bis die Protokollausgabe abgeschlossen ist.
                taskId = response["taskId"]
                while True:
                    response = logs.describe_export_tasks(
                        taskId = taskId
                    )
                    status = response["exportTasks"][0]["status"]["code"]
                    #Unterbrechen Sie die Ausführung der Aufgabe
                    if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
                        logger.info(f"taskId {taskId} has finished exporting")
                        break
                    else:
                        logger.info(f"taskId {taskId} is now exporting")
                        time.sleep(WAITING_TIME)
                        continue

            except Exception as e:
                traceback.print_exc()
                logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)

    except Exception as e:
        traceback.print_exc()
        logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
        raise

DynamoDB

# -*- coding: utf-8 -*-
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')

#Holen Sie sich 1 Artikel
response = table.get_item(
    Key = {
        "id" : "1"
    }
)
#Registrieren Sie einen Artikel
response = table.put_item(
    Item = {
        "id" : "1",
        "key" : "value"
    }
)
#1 Artikel aktualisiert
response = table.update_item(
    Key = {
        "id" : "1"
    },
    UpdateExpression = "set key2 = :val",
    ExpressionAttributeValues = {
        ":val" : "value2"
    },
    ReturnValues = "UPDATED_NEW"
)
#Löschen Sie einen Eintrag
response = table.delete_item(
    Key = {
        "id" : "1"
    }
)


#Alle Elemente löschen (alle Datensätze) abschneiden oder aus der Tabelle löschen
#Holen Sie sich alle Daten
delete_items = []
parameters   = {}
while True:
    response = table.scan(**parameters)
    delete_items.extend(response["Items"])
    if "LastEvaluatedKey" in response:
        parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
    else:
        break
#Schlüsselextraktion
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#Daten löschen
with table.batch_writer() as batch:
    for key in delete_keys:
        batch.delete_item(Key = key)



#Tabelle löschen
response = dynamodb_client.list_tables()
if 'TableNames' in response:
    for table_name in response['TableNames']:
        if table_name == "Tabellenname, der gelöscht werden soll":
            dynamodb_client.delete_table(TableName = table_name)
            waiter = dynamodb_client.get_waiter("table_not_exists")
            waiter.wait(TableName = table_name)
            #Wenn Sie über die automatische Skalierung des Zielverfolgungstyps verfügen, können Sie CloudWatch Alarm gleichzeitig löschen, indem Sie ScalingPolicy löschen.
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}ReadCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:ReadCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}WriteCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:WriteCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")



#Tabelle erstellen
table_name = "table"
dynamodb.create_table(
    TableName = table_name,
    KeySchema = [{
        "AttributeName" : "id",
        "KeyType" : "HASH"
    }],
    AttributeDefinitions = [{
        "AttributeName" : "id",
        "AttributeType" : "S"
    }],
    ProvisionedThroughput = {
        "ReadCapacityUnits" : 1,
        "WriteCapacityUnits" : 1
    }
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
#Einstellungen für die automatische Skalierung des Zielverfolgungstyps
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM-Rollen-ARN für die automatische Skalierung"
)
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:WriteCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM-Rollen-ARN für die automatische Skalierung"
)
#Skalierungsrichtlinie festlegen
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}ReadCapacity",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBReadCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}WriteCapacity",
    ScalableDimension='dynamodb:table:WriteCapacityUnits',
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)



#Aktualisierung des Tabellenschemas
response = dynamodb_client.update_table(
    AttributeDefinitions = [
        {
            'AttributeName': 'string',
            'AttributeType': 'S'|'N'|'B'
        },
    ],
    TableName = 'string',
    BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
    ProvisionedThroughput = {
        'ReadCapacityUnits': 123,
        'WriteCapacityUnits': 123
    }
)


#Stapelverarbeitung
table = dynamodb.Table("table")
with table.batch_writer() as batch:
    for i in range(10 ** 6):
        batch.put_item(
            Item = {
                "id" : str(i + 1),
                "key" : f"key{i + 1}"
            }
        )

Kinesis Data Stream

# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")

#Datenübertragung
kinesis.put_records(
    Records = [
        {
            "Data" : b"String",
            "PartitionKey" : "String"
        }
    ],
    StreamName = "Kinesis Data Stream Name"
) 

Lambda

# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')

#Lambda rennen
response = lambda_client.invoke(
    FunctionName = 'Lambda verarbeitet werden',
    InvocationType = 'Event'|'RequestResponse'|'DryRun',
    LogType = 'None'|'Tail',
    Payload = b'bytes'|file
)


#Ereignisquellenzuordnungen aktivieren oder deaktivieren
import time
expected_status = "Enabled" #Im Falle einer Ungültigmachung deaktiviert
progressing_status = "Enabling" #Im Falle einer Ungültigmachung deaktivieren
Enabled = True #Falsch für die Ungültigmachung
response = lambda_client.list_event_source_mappings(
    EventSourceArn = "Kinesis oder DynamoDB ARN werden verarbeitet",
    FunctionName = "Name der zu verarbeitenden Lambda-Funktion"
)
if "EventSourceMappings" in response:
    for e in response["EventSourceMappings"]:
        if e["State"] != expected_status or e["State"] != progressing_status:
            response = lambda_client.update_event_source_mapping(
                UUID = e["UUID"],
                FunctionName = e["FunctionArn"],
                Enabled = Enabled,
                BatchSize = 100
            )
        if response["State"] != expected_status:
            while True:
                response = lambda_client.get_event_source_mapping(
                    UUID = e["UUID"]
                )
                if response["State"] == expected_status:
                    break
                time.sleep(10)

SageMaker

# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")

#Greifen Sie auf den Sagemaker-Endpunkt zu und erhalten Sie die Vorhersageergebnisse
response = sagemaker.invoke_endpoint(
    EndpointName = "Name des SageMaker-Endpunkts",
    Body=b'bytes'|file,
    ContentType = 'text/csv', #The MIME type of the input data in the request body.
    Accept = 'application/json' #The desired MIME type of the inference in the response.
)

SQS


# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "URL der SQS-Warteschlange. Sie können es auf der Konsole überprüfen."

#Holen Sie sich alle Nachrichten aus der SQS-Warteschlange.
while True:
    sqs_message = sqs.receive_message(
        QueueUrl = QUEUE_URL,
        MaxNumberOfMessages = 10
    )
    if "Messages" in sqs_message:
        for message in sqs_message["Messages"]:
            try:
                print(message)
                #Löschen Sie die erfasste Nachricht. Andernfalls erhalten Sie möglicherweise Duplikate
                sqs.delete_message(
                    QueueUrl = QUEUE_URL,
                    ReceiptHandle = message["ReceiptHandle"]
                )
            except Exception as e:
                print(f"type = {type(e)} , message = {e}")

S3


# -*- coding: utf-8 -*-
import boto3
s3 = boto3.client('s3')
BUCKET_NAME = "Zu verarbeitender Bucket-Name"

#1 Objekt (Datei) schreiben
s3.put_object(
    Bucket = BUCKET_NAME,
    Body = "Dateninhalt. str-Typ oder Byte-Typ",
    Key = "S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen"
)
#1 Objekt (Datei) lesen
s3.get_object(
    Bucket = BUCKET_NAME,
    Key = "S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen"
)
#1 Objekt (Datei löschen)
s3.delete_object(
    Bucket = BUCKET_NAME,
    Key = "S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen"
)
#Kopieren Sie ein Objekt (eine Datei) an einen anderen Speicherort
s3.copy_object(
    Bucket = BUCKET_NAME,
    Key = "Ziel S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen",
    CopySource = {
        "Bucket" : "Name des Quell-Buckets",
        "Key" : "S3-Schlüssel der Migrationsquelle"
    }
)


#Holen Sie sich alle Objekte (Dateien) unter dem angegebenen Präfix oder in den Bucket
BUCKET_NAME= "Zu verarbeitender Bucket-Name"
contents = []
kwargs = {
    "Bucket" : BUCKET_NAME,
    "Prefix" : "Suchzielpräfix"
} 
while True:
    response = s3.list_objects_v2(**kwargs)
    if "Contents" in response:
        contents.extend(response["Contents"])
        if 'NextContinuationToken' in response:
            kwargs["ContinuationToken"] = response['NextContinuationToken']
            continue
    break

Recommended Posts

Boto3-API (AWS-Ressourcen mit Python-Bibliothek bearbeiten), die häufig privat verwendet wird
Zusammenfassung der Grammatik, die bei matplotlib oft vergessen wird
LINE BOT mit Python + AWS Lambda + API Gateway
Von Boto3 verwendete Anmeldeinformationen (AWS SDK für Python)
String-Manipulation mit Python & Pandas, die ich oft benutze
Beachten Sie, dass das Schreiben mit Ruby so ist wie das Schreiben mit Python
Bearbeiten Sie S3-Objekte mit Boto3 (High-Level-API und Low-Level-API)
Hinweise zu Python-Kenntnissen, die mit AtCoder verwendet werden können
Zeigen Sie nur die Ressourcen an, die beim Erwerb von AWS-Ressourcen mit Boto3 erstellt wurden
[AWS] Versuchen Sie, die Python-Bibliothek mit SAM + Lambda (Python) zur Ebene hinzuzufügen.
[Python] Machen Sie mit Moto verspottete AWS-Ressourcen zu Pytest-Fixtures
Erstellen Sie mit AWS SAM schnell eine API mit Python, Lambda und API Gateway