Boto3 (Python) API-Beispiel, das ich oft benutze

*** Im Folgenden sind einige der APIs aufgeführt, die im "AWS SDK für Python" aufgeführt sind und die ich häufig verwende und die sich auf boto3 (Python-Bibliothek) beziehen. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

*** Bitte lesen Sie den Ressourcennamen und die entsprechenden Einstellungswerte entsprechend ~ ***

Athena

# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")

class QueryFailed(Exception):
    """
Ausnahmeklasse, die aufgerufen wird, wenn die Ausführung der Athena-Abfrage fehlschlägt
    """
    pass

#do query as a async process
start_query_response = athena.start_query_execution(
    QueryString = 'Abfrage',
    QueryExecutionContext={
        "Database": "Your GlueDB name"
    },
    ResultConfiguration={
        "OutputLocation" : "s3://xxx/yyy"
    }
)
query_execution_id = start_query_response["QueryExecutionId"]
#Check the query status
while True:
    query_status = athena.get_query_execution(
        QueryExecutionId = query_execution_id
    )
    query_execution_status = query_status['QueryExecution']['Status']['State']
    if query_execution_status == 'SUCCEEDED':
        break
    elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
        raise QueryFailed(f"query_execution_status = {query_execution_status}")
    else:
        time.sleep(10)
#Get query result. Just only for your query successed
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)

CloudWatchLogs

Beispiel für CloudWatch-Protokolle

# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
from logging import getLogger, StreamHandler, DEBUG, INFO, WARNING, ERROR, CRITICAL
import traceback

#logger setting
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(os.getenv("LogLevel", DEBUG))
logger.addHandler(handler)
logger.propagate = False

logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])

#define timezone as JST
JST = timezone(timedelta(hours=9),"JST")

#DateFormat when you log out into S3
DATE_FORMAT = "%Y-%m-%d"

def lambda_handler(event, context):
    """
Ausgabe von CloudWatch-Protokollen im Wert von einem Tag an S3.
Die Zielzeit ist wie folgt.
    AM 00:00:00.000000 ~ PM 23:59:59.999999
    """
    try:
        #Gestern PM23:59:59.999999
        tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
        #Gestern AM00:00:00.000000
        tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
        #Wird als Präfix bei der Ausgabe von S3-Protokollen verwendet
        target_date = tmp_yesterday.strftime(DATE_FORMAT)
        #Für die Protokollausgabe in Zeitstempeltyp konvertieren (bis zu Mikrosekunden dauern)
        today = int(tmp_today.timestamp() * 1000)
        yesterday = int(tmp_yesterday.timestamp() * 1000)


        #Get the CloudWatchLogGroups from your environment variable
        logGroups = os.environ["LOG_GROUPS"].split(",")
        for logGroupName in logGroups:
            try:
                keys = ["logGroupName","yesterday","today","target_date"]
                values = [logGroupName,yesterday,today,target_date]
                payload = dict(zip(keys,values))

                #Output CloudWatchLogs into S3 (Async Process)
                response = logs.create_export_task(
                    logGroupName = payload["logGroupName"],
                    fromTime = payload["yesterday"],
                    to = payload["today"],
                    destination = BUCKET_NAME,
                    destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
                )
                
                taskId = response["taskId"]
                while True:
                    response = logs.describe_export_tasks(
                        taskId = taskId
                    )
                    status = response["exportTasks"][0]["status"]["code"]
                    if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
                        logger.info(f"taskId {taskId} has finished exporting")
                        break
                    else:
                        logger.info(f"taskId {taskId} is now exporting")
                        time.sleep(WAITING_TIME)
                        continue

            except Exception as e:
                traceback.print_exc()
                logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)

    except Exception as e:
        traceback.print_exc()
        logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
        raise

DynamoDB

DynamoDB-Beispiel
# -*- coding: utf-8 -*-
import boto3
from boto3.dynamodb.conditions import Key,Attr

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')

"""
DML  select update insert delete
"""
#get a record
response = table.get_item(
    Key = {
        "id" : "1"
    }
)
#write a record
response = table.put_item(
    Item = {
        "id" : "1",
        "key" : "value"
    }
)
#update a record
response = table.update_item(
    Key = {
        "id" : "1"
    },
    UpdateExpression = "set #age = :val",
    ExpressionAttributeNames = {
        "#age" : "age"
    },
    ExpressionAttributeValues = {
        ":val" : 95
    },
    ReturnValues = "UPDATED_NEW"
)
#delete a record
response = table.delete_item(
    Key = {
        "id" : "1"
    }
)



#write records as a batch process
with table.batch_writer() as batch:
    for i in range(10 ** 6):
        batch.put_item(
            Item = {
                "id" : str(i + 1),
                "key" : f"key{i + 1}"
            }
        )


# truncate  or delete from table
delete_items = []
parameters   = {}
while True:
    response = table.scan(**parameters)
    delete_items.extend(response["Items"])
    if "LastEvaluatedKey" in response:
        parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
    else:
        break
# get the hash key and range key
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#delete datas as batch process
with table.batch_writer() as batch:
    for key in delete_keys:
        batch.delete_item(Key = key)



"""
DDL  Table Process
"""
#Delete a table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
    for table_name in response['TableNames']:
        if table_name == "Tabellenname, der gelöscht werden soll":
            dynamodb_client.delete_table(TableName = table_name)
            waiter = dynamodb_client.get_waiter("table_not_exists")
            waiter.wait(TableName = table_name)
            #Wenn Sie über die automatische Skalierung des Zielverfolgungstyps verfügen, können Sie CloudWatch Alarm gleichzeitig löschen, indem Sie ScalingPolicy löschen.
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}ReadCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:ReadCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")
            try:
                autoscaling_client.delete_scaling_policy(
                    PolicyName = f'{table_name}WriteCapacity',
                    ServiceNamespace = "dynamodb",
                    ResourceId = f"table/{table_name}",
                    ScalableDimension = "dynamodb:table:WriteCapacityUnits"
                )
            except autoscaling_client.exceptions.ObjectNotFoundException as e:
                print(f"type = {type(e)}, message = {e}")



#Create a table
table_name = "table"
dynamodb.create_table(
    TableName = table_name,
    KeySchema = [{
        "AttributeName" : "id",
        "KeyType" : "HASH"
    }],
    AttributeDefinitions = [{
        "AttributeName" : "id",
        "AttributeType" : "S"
    }],
    ProvisionedThroughput = {
        "ReadCapacityUnits" : 1,
        "WriteCapacityUnits" : 1
    }
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
# AutoScaling setting of Target Tracking
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM-Rollen-ARN für die automatische Skalierung"
)
autoscaling_client.register_scalable_target(
    ServiceNamespace = "dynamodb",
    ResourceId = f"table/{table_name}",
    ScalableDimension = "dynamodb:table:WriteCapacityUnits",
    MinCapacity = 1,
    MaxCapacity = 10,
    RoleARN = "IAM-Rollen-ARN für die automatische Skalierung"
)
# set up the autoscaling policy
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}ReadCapacity",
    ScalableDimension = "dynamodb:table:ReadCapacityUnits",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBReadCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)
autoscaling_client.put_scaling_policy(
    ServiceNamespace='dynamodb',
    ResourceId = f"table/{table_name}",
    PolicyType = "TargetTrackingScaling",
    PolicyName = f"{table_name}WriteCapacity",
    ScalableDimension='dynamodb:table:WriteCapacityUnits',
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue" : 70,
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
        },
       "ScaleOutCooldown" : 70,
       "ScaleInCooldown" : 70
   }
)



#update the table scheme
response = dynamodb_client.update_table(
    AttributeDefinitions = [
        {
            'AttributeName': 'string',
            'AttributeType': 'S'|'N'|'B'
        },
    ],
    TableName = 'string',
    BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
    ProvisionedThroughput = {
        'ReadCapacityUnits': 123,
        'WriteCapacityUnits': 123
    }
)

EMR

# -*- coding: utf-8 -*-
import boto3

#EMR
emr = boto3.client("emr")

#show cluster list
cluster_list = emr.list_clusters(
    ClusterStates = ["STARTING","BOOTSTRAPPING","RUNNING","WAITING"]
)

#Set up Cluster
params = {
    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
}
response = emr.run_job_flow(**params)

#describe cluster
response = emr.describe_cluster(
    ClusterId = "The identifier of the cluster to describe."
)


#Add steps
step = {
    #https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.add_job_flow_steps
}
response = emr.add_job_flow_steps(
    JobFlowId = "A string that uniquely identifies the job flow. This identifier is returned by RunJobFlow and can also be obtained from ListClusters .",
    Steps = [step]
)

Kinesis Data Stream

# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")

#Datenübertragung
kinesis.put_records(
    Records = [
        {
            "Data" : b"String",
            "PartitionKey" : "String"
        }
    ],
    StreamName = "Kinesis Data Stream Name"
) 

Lambda

# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')

#Invoke Lambda
response = lambda_client.invoke(
    FunctionName = 'Lambda Function Name',
    InvocationType = 'Event'|'RequestResponse'|'DryRun',
    LogType = 'None'|'Tail',
    Payload = b'bytes'|file
)


#Event Source Mappings enable or disable
import time
expected_status = "Enabled" #when disable, Disabled
progressing_status = "Enabling" #when disable, Disabling
Enabled = True #when disable, False
response = lambda_client.list_event_source_mappings(
    EventSourceArn = "Kinesis or DynamoDB ARN",
    FunctionName = "Lambda Function name"
)
if "EventSourceMappings" in response:
    for e in response["EventSourceMappings"]:
        if e["State"] != expected_status or e["State"] != progressing_status:
            response = lambda_client.update_event_source_mapping(
                UUID = e["UUID"],
                FunctionName = e["FunctionArn"],
                Enabled = Enabled,
                BatchSize = 100
            )
        if response["State"] != expected_status:
            while True:
                response = lambda_client.get_event_source_mapping(
                    UUID = e["UUID"]
                )
                if response["State"] == expected_status:
                    break
                time.sleep(10)

SageMaker

# -*- coding: utf-8 -*-
import boto3

sagemaker = boto3.client("sagemaker-runtime")

#Invoke Endpoint and get a predicted result
response = sagemaker.invoke_endpoint(
    EndpointName = "SageMaker Endpoint Name",
    Body=b'bytes'|file,
    ContentType = 'text/csv', #The MIME type of the input data in the request body.
    Accept = 'application/json' #The desired MIME type of the inference in the response.
)

SQS

# -*- coding: utf-8 -*-
import boto3

sqs = boto3.client('sqs')
QUEUE_URL= "SQS Queue URL"

#Get all messages from Queue of SQS
while True:
    sqs_message = sqs.receive_message(
        QueueUrl = QUEUE_URL,
        MaxNumberOfMessages = 10
    )
    if "Messages" in sqs_message:
        for message in sqs_message["Messages"]:
            try:
                print(message)
                #When you get a message, delete it. Otherwise, You get a duplicate message
                sqs.delete_message(
                    QueueUrl = QUEUE_URL,
                    ReceiptHandle = message["ReceiptHandle"]
                )
            except Exception as e:
                print(f"type = {type(e)} , message = {e}")
    else:
        break

SSM

# -*- coding: utf-8 -*-
import boto3

ssm = boto3.client('ssm')
parameters = "your SSM parameter"
response = ssm.get_parameters(
    Names=[
        parameters,
    ],
    #Return decrypted values for secure string parameters. This flag is ignored for String and StringList parameter types.
    WithDecryption = True
)
print(response['Parameters'][0]['Value'])

S3

S3-Beispiel
# -*- coding: utf-8 -*-
import boto3

#kms
kms = boto3.client("kms")

#s3
s3 = boto3.client('s3')

BUCKET_NAME = "Bucket name"

"""
One object Process
"""
#Write object
s3.put_object(
    Bucket = BUCKET_NAME,
    Body = "test body".encode("UTF-8"),
    Key = "S3 key  s3://Bucket Name/Nachfolgende Verzeichnisse und Dateinamen"
)

#Read object
s3.get_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key  s3://Bucket Name/Nachfolgende Verzeichnisse und Dateinamen"
)["Body"].read().decode("UTF-8")

#Delete object
s3.delete_object(
    Bucket = BUCKET_NAME,
    Key = "S3 key  s3://Bucket Name/Nachfolgende Verzeichnisse und Dateinamen"
)

#copy object
s3.copy_object(
    Bucket = BUCKET_NAME,
    Key = "Ziel S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen",
    CopySource = {
        "Bucket" : "Name des Quell-Buckets",
        "Key" : "S3-Schlüssel der Migrationsquelle"
    }
)

"""
Multiple Objects Process
"""
#Get all objects under the specified s3 prefix
contents = []
kwargs = {
    "Bucket" : BUCKET_NAME,
    "Prefix" : "Suchzielpräfix"
} 
while True:
    response = s3.list_objects_v2(**kwargs)
    if "Contents" in response:
        contents.extend(response["Contents"])
        if 'NextContinuationToken' in response:
            kwargs["ContinuationToken"] = response['NextContinuationToken']
            continue
    break


"""
Server Side Encryption
1. Default Encryption
  1-1. SSE with AES-256
  1-2. SSE with KMS AWS Managed Keys
  1-3. SSE with KMS CMK(Customer Managed Keys)
2. Non Default Encryption
  2-1. SSE with AES-256
  2-2. SSE with KMS AWS Managed Keys
  2-3. SSE with KMS CMK(Customer Managed Keys)
  2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
"""
#1-1. SSE with AES-256
#1-2. SSE with KMS AWS Managed Keys
#1-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8")
)
print(f'ServerSideEncryption'.ljust(20) + f' = {response["ServerSideEncryption"]}')
#just only for KMS. check the KeyManager
if response["ServerSideEncryption"] == "aws:kms":
    KeyManager = kms.describe_key(
        KeyId = response["SSEKMSKeyId"]
    )["KeyMetadata"]["KeyManager"]
    print(f"KeyManager".ljust(20) + f" = {KeyManager}")


#2-1. SSE with AES-256
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "AES256"
)
#2-2. SSE with KMS AWS Managed Keys
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "aws:kms"
)
#2-3. SSE with KMS CMK(Customer Managed Keys)
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    ServerSideEncryption = "aws:kms",
    SSEKMSKeyId = "Your Customer Manged Key ID"
)
#2-4. SSE with Client operations key. This is not the key which S3 or KMS operates
response = s3.put_object(
    Bucket = BUCKET_NAME,
    Key = "test",
    Body = "Encrypted".encode("UTF-8"),
    SSECustomerAlgorithm = "AES256",
    SSECustomerKey = "The Key You generated. ex) SSE_CUSTOMER_KEY=$(cat /dev/urandom | base64 -i | fold -w 32 | head -n 1)"
)

Recommended Posts

Boto3 (Python) API-Beispiel, das ich oft benutze
Google Cloud Vision API-Beispiel für Python
PHP / Python / Ruby-Beispiel für die Pfad-API
Mein Numpy (Python)
Mein System (Python)
Mein Pyproj (Python)
Meine Pandas (Python)
Mein str (Python)
Mein Pyautogui (Python)
Python-Abschlussbeispiel
Mein PySide (Python)
Mein Shutil (Python)
Meine Matplotlib (Python)
Meine Urllib (Python)
Mein Pyperclip (Python)
Mein sklearn (Python)
[Mein Memo] Python
Mein ConfigParser (Python)
Mein Webdriver (Python)
Mein Arcpy (Python)
Mein win32gui (Python)
Mein Betriebssystem (Python)
Ajax + Python + PostgreSQL-Beispiel
Evernote-API in Python
[Mein Memo] Python -v / Python -V
Python-Tipps (mein Memo)
[Python] Abrufen von Insight-Daten mithilfe der Google My Business-API
Liste der Python-APIs für OpenCV3
C-API in Python 3
TensorFlow API-Memo (Python)
Python - Einfaches Multithread-Beispiel
Beispielbild eines Python-API-Servers für EC2 (öffentliches AMI)
Hit Mastodons API in Python
Mit Python erstellte Beispieldaten
Mein Python-Datenanalyse-Container
Verwenden Sie die Trello-API mit Python
AWS CDK-Lambda + API-Gateway (Python)
Verwenden Sie die Twitter-API mit Python
S3-Betrieb mit Python Boto3
Meine Python-Datenanalyseumgebung
Web-API mit Python + Falcon
Beispiel für Funktionen höherer Ordnung (Dekoratoren) in Python
Blender Python API in Houdini (Python 3)
Beispiel für die Verwendung von Python Pickle
[Python] Beispielcode für die Python-Grammatik
Rufen Sie die API mit python3 auf.
Verwenden Sie die Unterschall-API mit Python3
Google Drive API-Tipps (Python)
Verwenden Sie die e-Stat-API von Python
Beispiel zur Verwendung nach OAuth-Authentifizierung der BOX-API mit Python
Ich habe versucht, meinen Lieblingssänger (SHISHAMO) mit der Spotify-API zu analysieren
[SEO] Flow / Beispielcode bei Verwendung der Google Analytics-API in Python