*** Voici quelques-unes des API répertoriées dans le "AWS SDK for Python" que j'utilise souvent, liées à boto3 (bibliothèque Python). *** *** https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html
*** Veuillez lire le nom de la ressource et les différentes valeurs de réglage, le cas échéant ~ ***
Athena
# -*- coding: utf-8 -*-
import boto3
import time
athena = boto3.client("athena")
class QueryFailed(Exception):
"""
Classe d'exception appelée lorsque l'exécution de la requête Athena échoue
"""
pass
#Exécution de requêtes (traitement asynchrone)
start_query_response = athena.start_query_execution(
QueryString = 'Requete',
QueryExecutionContext={
"Database": "Nom GlueDB utilisé pour interroger"
},
ResultConfiguration={
"OutputLocation" : "s3://Nom du compartiment de sortie/Clé"
}
)
query_execution_id = start_query_response["QueryExecutionId"]
#Vérifier l'état d'exécution de la requête
while True:
query_status = athena.get_query_execution(
QueryExecutionId = query_execution_id
)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'SUCCEEDED':
break
elif query_execution_status == "FAILED" or query_execution_status == "CANCELLED":
raise QueryFailed(f"query_execution_status = {query_execution_status}")
time.sleep(10)
#Obtenir le résultat de l'exécution de la requête (uniquement en cas de succès)
query_results = athena.get_query_results(QueryExecutionId = query_execution_id)
CloudWatchLogs
# -*- coding: utf-8 -*-
from datetime import datetime,timezone,timedelta
import os
import boto3
import time
import logging
import traceback
#Paramètres du journal
logger = logging.getLogger()
logger.setLevel(os.getenv('LOG_LEVEL', logging.DEBUG))
logs = boto3.client("logs")
BUCKET_NAME = os.environ["BUCKET_NAME"]
WAITING_TIME = int(os.environ["WAITING_TIME"])
#Définir le fuseau horaire sur l'heure du Japon (JST)
JST = timezone(timedelta(hours=9),"JST")
#Type de date lors de la sortie des journaux vers S3
DATE_FORMAT = "%Y-%m-%d"
def lambda_handler(event, context):
"""
Exportez l'équivalent d'une journée de CloudWatch Logs vers S3.
Le temps cible est le suivant.
AM 00:00:00.000000 ~ PM 23:59:59.999999
"""
try:
#Hier PM23:59:59.999999
tmp_today = datetime.now(JST).replace(hour=0,minute=0,second=0,microsecond=0) - timedelta(microseconds=1)
#Hier AM00:00:00.000000
tmp_yesterday = (tmp_today - timedelta(days=1)) + timedelta(microseconds=1)
#Utilisé comme préfixe lors de la sortie des journaux S3
target_date = tmp_yesterday.strftime(DATE_FORMAT)
#Convertir en type d'horodatage pour la sortie du journal (prendre jusqu'à microsecondes)
today = int(tmp_today.timestamp() * 1000)
yesterday = int(tmp_yesterday.timestamp() * 1000)
#Obtenir CloudWatchLogGroup à partir de la variable d'environnement
logGroups = os.environ["LOG_GROUPS"].split(",")
for logGroupName in logGroups:
try:
keys = ["logGroupName","yesterday","today","target_date"]
values = [logGroupName,yesterday,today,target_date]
payload = dict(zip(keys,values))
#Exécuter la sortie du journal
response = logs.create_export_task(
logGroupName = payload["logGroupName"],
fromTime = payload["yesterday"],
to = payload["today"],
destination = BUCKET_NAME,
destinationPrefix = "Logs" + payload["logGroupName"] + "/" + payload["target_date"]
)
#Attendez que la sortie du journal se termine.
taskId = response["taskId"]
while True:
response = logs.describe_export_tasks(
taskId = taskId
)
status = response["exportTasks"][0]["status"]["code"]
#Pause si l'exécution de la tâche est terminée
if status != "PENDING" and status != "PENDING_CANCEL" and status != "RUNNING":
logger.info(f"taskId {taskId} has finished exporting")
break
else:
logger.info(f"taskId {taskId} is now exporting")
time.sleep(WAITING_TIME)
continue
except Exception as e:
traceback.print_exc()
logger.warning(f"type = {type(e)} , message = {e}",exc_info=True)
except Exception as e:
traceback.print_exc()
logger.error(f"type = {type(e)} , message = {e}",exc_info=True)
raise
DynamoDB
# -*- coding: utf-8 -*-
import boto3
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("table")
dynamodb_client = boto3.client('dynamodb')
autoscaling_client = boto3.client('application-autoscaling')
#Obtenez 1 article
response = table.get_item(
Key = {
"id" : "1"
}
)
#Enregistrer un article
response = table.put_item(
Item = {
"id" : "1",
"key" : "value"
}
)
#Mis à jour 1 article
response = table.update_item(
Key = {
"id" : "1"
},
UpdateExpression = "set key2 = :val",
ExpressionAttributeValues = {
":val" : "value2"
},
ReturnValues = "UPDATED_NEW"
)
#Supprimer 1 élément
response = table.delete_item(
Key = {
"id" : "1"
}
)
#Supprimer tous les éléments (tous les enregistrements) tronquer ou supprimer de la table
#Obtenez toutes les données
delete_items = []
parameters = {}
while True:
response = table.scan(**parameters)
delete_items.extend(response["Items"])
if "LastEvaluatedKey" in response:
parameters["ExclusiveStartKey"] = response["LastEvaluatedKey"]
else:
break
#Extraction de clé
key_names = [ x["AttributeName"] for x in table.key_schema ]
delete_keys = [ { k:v for k,v in x.items() if k in key_names } for x in delete_items ]
#Suprimmer les données
with table.batch_writer() as batch:
for key in delete_keys:
batch.delete_item(Key = key)
#Supprimer la table
response = dynamodb_client.list_tables()
if 'TableNames' in response:
for table_name in response['TableNames']:
if table_name == "Nom de la table à supprimer":
dynamodb_client.delete_table(TableName = table_name)
waiter = dynamodb_client.get_waiter("table_not_exists")
waiter.wait(TableName = table_name)
#Si vous avez le type de suivi de cible Auto Scaling, vous pouvez supprimer CloudWatch Alarm en même temps en supprimant ScalingPolicy.
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}ReadCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
try:
autoscaling_client.delete_scaling_policy(
PolicyName = f'{table_name}WriteCapacity',
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits"
)
except autoscaling_client.exceptions.ObjectNotFoundException as e:
print(f"type = {type(e)}, message = {e}")
#Créer une table
table_name = "table"
dynamodb.create_table(
TableName = table_name,
KeySchema = [{
"AttributeName" : "id",
"KeyType" : "HASH"
}],
AttributeDefinitions = [{
"AttributeName" : "id",
"AttributeType" : "S"
}],
ProvisionedThroughput = {
"ReadCapacityUnits" : 1,
"WriteCapacityUnits" : 1
}
)
waiter = dynamodb_client.get_waiter("table_exists")
waiter.wait(TableName = table_name)
#Paramètres de mise à l'échelle automatique du type de suivi de la cible
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "ARN de rôle IAM pour Auto Scaling"
)
autoscaling_client.register_scalable_target(
ServiceNamespace = "dynamodb",
ResourceId = f"table/{table_name}",
ScalableDimension = "dynamodb:table:WriteCapacityUnits",
MinCapacity = 1,
MaxCapacity = 10,
RoleARN = "ARN de rôle IAM pour Auto Scaling"
)
#Définir la politique de mise à l'échelle
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}ReadCapacity",
ScalableDimension = "dynamodb:table:ReadCapacityUnits",
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBReadCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
autoscaling_client.put_scaling_policy(
ServiceNamespace='dynamodb',
ResourceId = f"table/{table_name}",
PolicyType = "TargetTrackingScaling",
PolicyName = f"{table_name}WriteCapacity",
ScalableDimension='dynamodb:table:WriteCapacityUnits',
TargetTrackingScalingPolicyConfiguration={
"TargetValue" : 70,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "DynamoDBWriteCapacityUtilization"
},
"ScaleOutCooldown" : 70,
"ScaleInCooldown" : 70
}
)
#Mise à jour du schéma de table
response = dynamodb_client.update_table(
AttributeDefinitions = [
{
'AttributeName': 'string',
'AttributeType': 'S'|'N'|'B'
},
],
TableName = 'string',
BillingMode = 'PROVISIONED'|'PAY_PER_REQUEST',
ProvisionedThroughput = {
'ReadCapacityUnits': 123,
'WriteCapacityUnits': 123
}
)
#Le traitement par lots
table = dynamodb.Table("table")
with table.batch_writer() as batch:
for i in range(10 ** 6):
batch.put_item(
Item = {
"id" : str(i + 1),
"key" : f"key{i + 1}"
}
)
Kinesis Data Stream
# -*- coding: utf-8 -*-
import boto3
kinesis = boto3.client("kinesis")
#Transmission de données
kinesis.put_records(
Records = [
{
"Data" : b"String",
"PartitionKey" : "String"
}
],
StreamName = "Kinesis Data Stream Name"
)
Lambda
# -*- coding: utf-8 -*-
import boto3
lambda_client = boto3.client('lambda ')
#Exécution Lambda
response = lambda_client.invoke(
FunctionName = 'Lambda à traiter',
InvocationType = 'Event'|'RequestResponse'|'DryRun',
LogType = 'None'|'Tail',
Payload = b'bytes'|file
)
#Activer ou désactiver les mappages de source d'événements
import time
expected_status = "Enabled" #En cas d'invalidation, désactivé
progressing_status = "Enabling" #En cas d'invalidation, désactivation
Enabled = True #Faux pour invalidation
response = lambda_client.list_event_source_mappings(
EventSourceArn = "Kinesis ou DynamoDB ARN à traiter",
FunctionName = "Nom de la fonction Lambda à traiter"
)
if "EventSourceMappings" in response:
for e in response["EventSourceMappings"]:
if e["State"] != expected_status or e["State"] != progressing_status:
response = lambda_client.update_event_source_mapping(
UUID = e["UUID"],
FunctionName = e["FunctionArn"],
Enabled = Enabled,
BatchSize = 100
)
if response["State"] != expected_status:
while True:
response = lambda_client.get_event_source_mapping(
UUID = e["UUID"]
)
if response["State"] == expected_status:
break
time.sleep(10)
SageMaker
# -*- coding: utf-8 -*-
import boto3
sagemaker = boto3.client("sagemaker-runtime")
#Accédez au point de terminaison sagemaker et recevez les résultats de prédiction
response = sagemaker.invoke_endpoint(
EndpointName = "Nom du point de terminaison SageMaker",
Body=b'bytes'|file,
ContentType = 'text/csv', #The MIME type of the input data in the request body.
Accept = 'application/json' #The desired MIME type of the inference in the response.
)
SQS
# -*- coding: utf-8 -*-
import boto3
sqs = boto3.client('sqs')
QUEUE_URL= "URL de la file d'attente SQS. Vous pouvez le vérifier sur la console."
#Récupérez tous les messages de la file d'attente SQS.
while True:
sqs_message = sqs.receive_message(
QueueUrl = QUEUE_URL,
MaxNumberOfMessages = 10
)
if "Messages" in sqs_message:
for message in sqs_message["Messages"]:
try:
print(message)
#Supprimez le message acquis. Sinon, vous risquez d'obtenir des doublons
sqs.delete_message(
QueueUrl = QUEUE_URL,
ReceiptHandle = message["ReceiptHandle"]
)
except Exception as e:
print(f"type = {type(e)} , message = {e}")
S3
# -*- coding: utf-8 -*-
import boto3
s3 = boto3.client('s3')
BUCKET_NAME = "Nom du bucket à traiter"
#1 Écriture d'objet (fichier)
s3.put_object(
Bucket = BUCKET_NAME,
Body = "Contenu des données. type str ou type d'octets",
Key = "Clé S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants"
)
#1 Lire l'objet (fichier)
s3.get_object(
Bucket = BUCKET_NAME,
Key = "Clé S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants"
)
#1 Supprimer l'objet (fichier)
s3.delete_object(
Bucket = BUCKET_NAME,
Key = "Clé S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants"
)
#Copier un objet (fichier) dans un autre emplacement
s3.copy_object(
Bucket = BUCKET_NAME,
Key = "Clé de destination S3. s3:://Nom du godet/Répertoires et noms de fichiers suivants",
CopySource = {
"Bucket" : "Nom du compartiment source",
"Key" : "Clé S3 de la source de migration"
}
)
#Obtenir tous les objets (fichiers) sous le préfixe spécifié ou dans le compartiment
BUCKET_NAME= "Nom du bucket à traiter"
contents = []
kwargs = {
"Bucket" : BUCKET_NAME,
"Prefix" : "Préfixe de la cible de recherche"
}
while True:
response = s3.list_objects_v2(**kwargs)
if "Contents" in response:
contents.extend(response["Contents"])
if 'NextContinuationToken' in response:
kwargs["ContinuationToken"] = response['NextContinuationToken']
continue
break
Recommended Posts