[PYTHON] Use Bucket (). Objects.filter instead of list_objects_v2 when outputting S3 list with Boto3

Low-level API and high-level API

It is boto3 which is a python library of aws, but there is a low level API which is a naive API and an object-oriented high level API that wraps it.

Manipulate S3 objects with Boto3 (high-level API and low-level API) --Qiita https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af

Until now, client.list_objects_v2, which is a low-level API, was used to output the list of S3, but resource.Bucket (). Objects.filter exists as a corresponding high-level API. (I couldn't find myself because the material of s3 was too huge)

It is an article that the amount of description is reduced and the speed is increased by using the high level API, so let's use the high level API.

Low level API

Use the new version of the S3 ListObjects API, ListObjects V2 | Developers.IO https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/

In list_objects_v2, 1000 items are fetched at a time. This is an example because pagination processing is required. (Calling this description recursively)

        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

speed

78733 Object → 46 seconds Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds

High level API

Bucket (). Objects is an ObjectSummary type, and attributes are specified by chaining filter, all, limit, page_size, etc. here. The return value is also ObjectSummary https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary

The ObjectSummary itself is an iterator, and the data is actually acquired at the timing of calling the iterator. If you specify KeyMarker in the argument of filter, you can search from the middle, you can specify RequestPayer, etc. It seems that you can do almost everything you can do with list_objects_v2.

        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
#        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
        b = [k.key for k in a]

speed

78733 Object → 33 seconds Executed <function test at 0x10191f200> in 33.14992713928223 seconds

Whole source code

Since it is a write-down code, there are some suitable parts.

import os
from pathlib import Path
from typing import Optional

import boto3
from dataclasses import dataclass
from lauda import stopwatch


@dataclass
class S3Manager:
    source_bucket: str
    source_prefix: str
    profile: Optional[str] = None

    def _session(self):
        s = boto3.session.Session(
            profile_name=self.profile
        )
        return s

    def _list_source(self, *, accumulated=None, next_token=None, func=None):
        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

        if func:
            return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)

    def _accumulate(self, *, response, keys, func, next_token, accumulated):
        got_keys = (accumulated or []) + keys
        if next_token:
            print(f'searching... current fetch keys are :{len(got_keys)}')
            return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
        else:
            return got_keys

    def list_all(self) -> list:
        return self._list_source(func=self._accumulate)

    def _delete(self, *, response, keys, func, next_token, accumulated):
        if keys:
            print(f'deleting: {self.source_bucket}/{self.source_prefix}')
            s3client = boto3.Session().client('s3')
            s3client.delete_objects(
                Bucket=self.source_bucket,
                Delete={
                    'Objects': [{'Key': key} for key in keys],
                    'Quiet': False
                },
            )

        if next_token:
            return self._list_source(next_token=next_token, func=func)

    def delete_all(self) -> None:
        self._list_source(func=self._delete)

    def list_all_test(self):
        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
        b = [k.key for k in a]
        print(len(b))


if __name__ == '__main__':
    os.chdir(Path(__file__).parents[1])

    @stopwatch
    def test():
        s3 = S3Manager(
            source_bucket='bucket',
            source_prefix='Path to search',
        )
        # s3.list_all()
        s3.list_all_test()

    test()

Summary

The lower level API is passing a function for extensibility, although there may be some overhead. The high-level API is not slow, and the description is easy, so let's use the high-level API.

Recommended Posts

Use Bucket (). Objects.filter instead of list_objects_v2 when outputting S3 list with Boto3
Use boto3 to mess with S3
How to deal with SSL error when connecting to S3 with boto of Python
Get a list of IAM users with Boto3
[Python] Summary of S3 file operations with boto3
[Memo] Load csv of s3 into pandas with boto3
EP 7 Use List Comprehensions Instead of map and filter
S3 uploader with boto
S3 operation with python boto3
It is convenient to use Icecream instead of print when debugging.