*** Im Folgenden sind einige der APIs aufgeführt, die im "AWS SDK für Python" aufgeführt sind und die ich häufig verwende und die sich auf boto3 (Python-Bibliothek) beziehen. *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html
*** Bitte lesen Sie den Ressourcennamen und die entsprechenden Einstellungswerte entsprechend ~ ***
Athena
# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")
class QueryFailed(Exception):
"""
Ausnahmeklasse, die aufgerufen wird, wenn die Ausführung der Athena-Abfrage fehlschlägt
"""
pass
#Abfrageausführung (asynchrone Verarbeitung)
start_query_response = athena.start_query_execution(
QueryString = 'Abfrage',
QueryExecutionContext={
"Database": "GlueDB-Name, der zum Abfragen verwendet wird"
},
ResultConfiguration={
"OutputLocation" : "s3://Name des Ausgabe-Buckets/Schlüssel"
}
)
query_execution_id = start_query_response["QueryExecutionId"]
#Überprüfen Sie den Ausführungsstatus der Abfrage
while True:
query_status = athena.get_query_execution(
QueryExecutionId = query_execution_id
)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
break
elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
raise QueryFailed(f"query_execution_status = {query_execution_status}")
time.sleep(10)
#Ergebnis der Abfrageausführung abrufen (nur bei Erfolg)
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)
CloudWatchLogs
# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback
#Protokolleinstellungen
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])
#Stellen Sie die Zeitzone auf Japanische Zeit (JST) ein.
JST = timezone(timedelta(hours=9),"JST")
#Datumstyp bei der Ausgabe von Protokollen an S3
DATE_FORMAT = "%Y-%m-%d"
def lambda_handler(event, context):
"""
Ausgabe von CloudWatch-Protokollen im Wert von einem Tag an S3.
Die Zielzeit ist wie folgt.
AM 00:00:00.000000 ~ PM 23:59:59.999999
"""
try:
#Gestern PM23:59:59.999999
tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
#Gestern AM00:00:00.000000
tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
#Wird als Präfix bei der Ausgabe von S3-Protokollen verwendet
target_date = tmp_yesterday.strftime(DATE_FORMAT)
#Für die Protokollausgabe in Zeitstempeltyp konvertieren (bis zu Mikrosekunden dauern)
today = int(tmp_today.timestamp() * 1000)
yesterday = int(tmp_yesterday.timestamp() * 1000)
#Holen Sie sich CloudWatchLogGroup von der Umgebungsvariablen
logGroups = os.environ["LOG_GROUPS"].split(",")
for logGroupName in logGroups:
try:
keys = ["logGroupName","yesterday","today","target_date"]
values = [logGroupName,yesterday,today,target_date]
payload = dict(zip(keys,values))
#Protokollausgabe ausführen
response = logs.create_export_task(
logGroupName = payload["logGroupName"],
fromTime = payload["yesterday"],
to = payload["today"],
destination = BUCKET_NAME,
destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
)
#Warten Sie, bis die Protokollausgabe abgeschlossen ist.
taskId = response["taskId"]
while True:
response = logs.describe_export_tasks(
taskId = taskId
)
status = response["exportTasks"][0]["status"]["code"]
#Unterbrechen Sie die Ausführung der Aufgabe
if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
logger.info(f"taskId {taskId} has finished exporting")
break
else:
logger.info(f"taskId {taskId} is now exporting")
time.sleep(WAITING_TIME)
continue
except Exception as e:
traceback.print_exc()
logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)
except Exception as e:
traceback.print_exc()
logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
raise
DynamoDB
# -*- coding: utf-8 -*-
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')
#Holen Sie sich 1 Artikel
response = table.get_item(
Key = {
"id" : "1"
}
)
#Registrieren Sie einen Artikel
response = table.put_item(
Item = {
"id" : "1",
"key" : "value"
}
)
#1 Artikel aktualisiert
response = table.update_item(
Key = {
"id" : "1"
},
UpdateExpression = "set key2 = :val",
ExpressionAttributeValues = {
":val" : "value2"
},
ReturnValues = "UPDATED_NEW"
)
#Löschen Sie einen Eintrag
response = table.delete_item(
Key = {
"id" : "1"
}
)
#Alle Elemente löschen (alle Datensätze) abschneiden oder aus der Tabelle löschen
#Holen Sie sich alle Daten
delete_items = []
parameters = {}
while True:
response = table.scan(**parameters)
delete_items.extend(response["Items"])
if "LastEvaluatedKey" in response:
parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
else:
break
#Schlüsselextraktion
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#Daten löschen
with table.batch_writer() as batch:
for key in delete_keys:
batch.delete_item(Key = key)
#Tabelle löschen
response = dynamodb_client.list_tables()
if 'TableNames' in response:
for table_name in response['TableNames']:
if table_name == "Tabellenname, der gelöscht werden soll":
dynamodb_client.delete_table(TableName = table_name)
waiter = dynamodb_client.get_waiter("table_not_exists")
waiter.wait(TableName = table_name)
#Wenn Sie über die automatische Skalierung des Zielverfolgungstyps verfügen, können Sie CloudWatch Alarm gleichzeitig löschen, indem Sie ScalingPolicy löschen.
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}ReadCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}WriteCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
#Tabelle erstellen
table_name = "table"
dynamodb.create_table(
TableName = table_name,
KeySchema = [{
"AttributeName" : "id",
"KeyType" : "HASH"
}],
AttributeDefinitions = [{
"AttributeName" : "id",
"AttributeType" : "S"
}],
ProvisionedThroughput = {
"ReadCapacityUnits" : 1,
"WriteCapacityUnits" : 1
}
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
#Einstellungen für die automatische Skalierung des Zielverfolgungstyps
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "IAM-Rollen-ARN für die automatische Skalierung"
)
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "IAM-Rollen-ARN für die automatische Skalierung"
)
#Skalierungsrichtlinie festlegen
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}ReadCapacity",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBReadCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}WriteCapacity",
ScalableDimension='dynamodb:table:WriteCapacityUnits',
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
#Aktualisierung des Tabellenschemas
response = dynamodb_client.update_table(
AttributeDefinitions = [
{
'AttributeName': 'string',
'AttributeType': 'S'|'N'|'B'
},
],
TableName = 'string',
BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
ProvisionedThroughput = {
'ReadCapacityUnits': 123,
'WriteCapacityUnits': 123
}
)
#Stapelverarbeitung
table = dynamodb.Table("table")
with table.batch_writer() as batch:
for i in range(10 ** 6):
batch.put_item(
Item = {
"id" : str(i + 1),
"key" : f"key{i + 1}"
}
)
Kinesis Data Stream
# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")
#Datenübertragung
kinesis.put_records(
Records = [
{
"Data" : b"String",
"PartitionKey" : "String"
}
],
StreamName = "Kinesis Data Stream Name"
)
Lambda
# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')
#Lambda rennen
response = lambda_client.invoke(
FunctionName = 'Lambda verarbeitet werden',
InvocationType = 'Event'|'RequestResponse'|'DryRun',
LogType = 'None'|'Tail',
Payload = b'bytes'|file
)
#Ereignisquellenzuordnungen aktivieren oder deaktivieren
import time
expected_status = "Enabled" #Im Falle einer Ungültigmachung deaktiviert
progressing_status = "Enabling" #Im Falle einer Ungültigmachung deaktivieren
Enabled = True #Falsch für die Ungültigmachung
response = lambda_client.list_event_source_mappings(
EventSourceArn = "Kinesis oder DynamoDB ARN werden verarbeitet",
FunctionName = "Name der zu verarbeitenden Lambda-Funktion"
)
if "EventSourceMappings" in response:
for e in response["EventSourceMappings"]:
if e["State"] != expected_status or e["State"] != progressing_status:
response = lambda_client.update_event_source_mapping(
UUID = e["UUID"],
FunctionName = e["FunctionArn"],
Enabled = Enabled,
BatchSize = 100
)
if response["State"] != expected_status:
while True:
response = lambda_client.get_event_source_mapping(
UUID = e["UUID"]
)
if response["State"] == expected_status:
break
time.sleep(10)
SageMaker
# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")
#Greifen Sie auf den Sagemaker-Endpunkt zu und erhalten Sie die Vorhersageergebnisse
response = sagemaker.invoke_endpoint(
EndpointName = "Name des SageMaker-Endpunkts",
Body=b'bytes'|file,
ContentType = 'text/csv', #The MIME type of the input data in the request body.
Accept = 'application/json' #The desired MIME type of the inference in the response.
)
SQS
# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "URL der SQS-Warteschlange. Sie können es auf der Konsole überprüfen."
#Holen Sie sich alle Nachrichten aus der SQS-Warteschlange.
while True:
sqs_message = sqs.receive_message(
QueueUrl = QUEUE_URL,
MaxNumberOfMessages = 10
)
if "Messages" in sqs_message:
for message in sqs_message["Messages"]:
try:
print(message)
#Löschen Sie die erfasste Nachricht. Andernfalls erhalten Sie möglicherweise Duplikate
sqs.delete_message(
QueueUrl = QUEUE_URL,
ReceiptHandle = message["ReceiptHandle"]
)
except Exception as e:
print(f"type = {type(e)} , message = {e}")
S3
# -*- coding: utf-8 -*-
import boto3
s3 = boto3.client('s3')
BUCKET_NAME = "Zu verarbeitender Bucket-Name"
#1 Objekt (Datei) schreiben
s3.put_object(
Bucket = BUCKET_NAME,
Body = "Dateninhalt. str-Typ oder Byte-Typ",
Key = "S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen"
)
#1 Objekt (Datei) lesen
s3.get_object(
Bucket = BUCKET_NAME,
Key = "S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen"
)
#1 Objekt (Datei löschen)
s3.delete_object(
Bucket = BUCKET_NAME,
Key = "S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen"
)
#Kopieren Sie ein Objekt (eine Datei) an einen anderen Speicherort
s3.copy_object(
Bucket = BUCKET_NAME,
Key = "Ziel S3-Taste. s3:://Eimername/Nachfolgende Verzeichnisse und Dateinamen",
CopySource = {
"Bucket" : "Name des Quell-Buckets",
"Key" : "S3-Schlüssel der Migrationsquelle"
}
)
#Holen Sie sich alle Objekte (Dateien) unter dem angegebenen Präfix oder in den Bucket
BUCKET_NAME= "Zu verarbeitender Bucket-Name"
contents = []
kwargs = {
"Bucket" : BUCKET_NAME,
"Prefix" : "Suchzielpräfix"
}
while True:
response = s3.list_objects_v2(**kwargs)
if "Contents" in response:
contents.extend(response["Contents"])
if 'NextContinuationToken' in response:
kwargs["ContinuationToken"] = response['NextContinuationToken']
continue
break
Recommended Posts