Lambda function to take AMI backup (python)

Various improvements are planned in the future.

Please refer to the following for settings etc.

# -*- coding: utf-8 -*-

import json

import boto3
from boto3.session import Session

import time
from datetime import datetime as dt

import pprint

TAG_KEY_AUTO_BACKUP       = 'Backup-Type'
TAG_VAL_AUTO_BACKUP       = 'auto'

print('Loading function')

pp = pprint.PrettyPrinter(indent=4)

#Function name: lambda_handler
def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))

    ec2_client   = boto3.client('ec2')
    ec2_resource = boto3.resource('ec2')

    ret = execute_ami_backup_task(ec2_client, ec2_resource)
    print 'AMI buckup task is completed(%s).' % (ret)

    return 0
    raise Exception('Something went wrong')

#Function name: execute_ami_backup_task
#Return value: Execution result
#Arguments: ec2_client
#       : ec2_resource
#Function: Perform AMI backup
def execute_ami_backup_task(ec2_client, ec2_resource):
    response = ec2_client.describe_instances()

    exec_time ='%Y%m%d%H%M%S')

    result = True
    for ec2_group in response['Reservations']:
        for instance_info in ec2_group['Instances']:
            ret = is_target(instance_info)
            if (ret == False):
            ret = create_buckup_image(ec2_client, ec2_resource, instance_info, exec_time)
            if not ret:
                print 'create_buckup_image(%s) was failed.' % (instance_info['InstanceId'])
                result = False


            ret = delete_old_image(ec2_client, ec2_resource, instance_info)
            if not ret:
                print 'delete_old_image(%s) was failed.' % (instance_info['InstanceId'])
                result = False

    return result

#Function name: is_target
#Return value: Backup required
#Arguments: instance_info <dict>
#Function: Judge the necessity of backup
def is_target(instance_info):
    val = get_tag_value(

    if val is None:
        return False

    return True

#Function name: get_tag_value
#Return value: Tag value (None if there is no match for the key)
#Arguments: instance_info <dict>
#       : key <str>
#Function: Get the tag value of the specified key from the instance information
def get_tag_value(instance_info, key):
    tags = instance_info['Tags']
    for tag in tags:
        if not (key == tag['Key']):
        return tag['Value']

    return None

#Function name: create_buckup_image
#Return value: Execution result
#Arguments: ec2_client
#       : ec2_resource
#       : instance_info <dict>
#       : exec_time <str>
#Function: Create a backup image
def create_buckup_image(ec2_client, ec2_resource, instance_info, exec_time):
    inst_id = instance_info['InstanceId']
    name    = get_tag_value(instance_info, 'Name')
    if name is None:
        print('Get name error!!')
        return False

    image_name = name + '-' + exec_time

    response = ec2_client.create_image(
        InstanceId  = inst_id,
        Name        = image_name,
        Description = image_name,
        NoReboot    = True

    image_id = response['ImageId']
    print '%s was created.' % (image_id)

    #Just in case, wait until the image & snapshot to be tagged is completed.

    tags  = construct_backup_tags(instance_info)
    image = ec2_resource.Image(image_id)

    set_tags_to_image(image, tags)

    set_tags_to_snapshot(ec2_resource, image, tags, image_name)

    return True

#Function name: construct_backup_tags
#Return value: Tags group
#Arguments: instance_info <dict>
#Function: Configure a group of tags for back-up settings
def construct_backup_tags(instance_info):
    ret_tags = []
    tags = instance_info['Tags']
    for tag in tags:
        if (TAG_KEY_BACKUP_GENERATION == tag['Key']):


    t = {u'Value': TAG_VAL_AUTO_BACKUP, u'Key': TAG_KEY_AUTO_BACKUP}

    return ret_tags

#Function name: set_tags_to_image
#Return value: non
#Argument: image
#       : tags <list>
#Function: Set tag information on the AMI image
def set_tags_to_image(image, tags):
    image.create_tags(Tags = tags)


#Function name: set_tags_to_snapshot
#Return value: non
#Arguments: ec2_resource
#       : image
#       : tags <list>
#       : image_name <str>
#Function: Set tag information in the snapshot
def set_tags_to_snapshot(ec2_resource, image, tags, image_name):
    for dev in image.block_device_mappings:
        #Not applicable except for EBS
        if not dev.has_key('Ebs'):

        #Deleted once to replace the Name tag
        name_idx = get_name_tag_index(tags)

        #Name tag setting
        dev_name = dev['DeviceName'][5:]
        name = image_name + '-' + dev_name
        t = {u'Value': name, u'Key': 'Name'}
        snapshot_id = dev['Ebs']['SnapshotId']
        snapshot = ec2_resource.Snapshot(snapshot_id)

        snapshot.create_tags(Tags = tags)


#Function name: get_name_tag_index
#Return value: Index position of Name tag (None if there is no match for the key)
#Arguments: tags<list>
#Function: Get the index position of the Name tag in the tag list
def get_name_tag_index(tags):
    idx = 0
    for tag in tags:
        if tag['Key'] == 'Name':
            return idx

        idx += 1

    return None

#Function name: delete_old_image
#Return value: Execution result
#Arguments: ec2_client
#       : ec2_resource
#       : instance_info <dict>
#Function: Deletes images older than the retention generation
def delete_old_image(ec2_client, ec2_resource, instance_info):
    sorted_images = get_sorted_images(ec2_client, instance_info)

    generation = int(get_tag_value(instance_info, TAG_KEY_BACKUP_GENERATION))
    cnt = 0
    for img in sorted_images:
        cnt += 1
        if generation >= cnt:

        image_id  = img['ImageId']
        snapshots = get_snapshots(ec2_resource, image_id)

        #Release the AMI image
            ImageId = image_id
        print '%s was deregistered.' % (image_id)

        #Wait until the release is complete

        #Delete the corresponding snapshot
        for snapshot in snapshots:
            print '%s was deleted.' % (snapshot)

    return True

#Function name: get_sorted_images
#Return value: Sorted image<list>
#Arguments: ec2_client
#       : instance_info <dict>
#Function: Get the AMI image list sorted in the order of creation
def get_sorted_images(ec2_client, instance_info):
    sorted_images = []
    name     = get_tag_value(instance_info, 'Name')
    response = ec2_client.describe_images(
        Owners  = ['self'],
        Filters = [{'Name': 'tag:Name',                   'Values': [name]},
                   {'Name': 'tag:' + TAG_KEY_AUTO_BACKUP, 'Values': [TAG_VAL_AUTO_BACKUP]}]

    images = response['Images']
    sorted_images = sorted(
        key = lambda x: x['CreationDate'], 
        reverse = True

    return sorted_images

#Function name: get_snapshots
#Return value: Snapshot group<list>
#Arguments: ec2_resource
#       : image_id <str>
#Function: Acquires snapshots included in the AMI image
def get_snapshots(ec2_resource, image_id):
    snapshots = []
    image     = ec2_resource.Image(image_id)

    for dev in image.block_device_mappings:
        if not dev.has_key('Ebs'):

        snapshot_id = dev['Ebs']['SnapshotId']
        snapshot    = ec2_resource.Snapshot(snapshot_id)

    return snapshots

