It is boto3 which is a python library of aws, but there is a low level API which is a naive API and an object-oriented high level API that wraps it.
Manipulate S3 objects with Boto3 (high-level API and low-level API) --Qiita https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af
Until now, client.list_objects_v2, which is a low-level API, was used to output the list of S3, but resource.Bucket (). Objects.filter exists as a corresponding high-level API. (I couldn't find myself because the material of s3 was too huge)
It is an article that the amount of description is reduced and the speed is increased by using the high level API, so let's use the high level API.
Use the new version of the S3 ListObjects API, ListObjects V2 | Developers.IO https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/
In list_objects_v2, 1000 items are fetched at a time. This is an example because pagination processing is required. (Calling this description recursively)
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
78733 Object → 46 seconds
Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds
Bucket (). Objects is an ObjectSummary type, and attributes are specified by chaining filter, all, limit, page_size, etc. here. The return value is also ObjectSummary https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary
The ObjectSummary itself is an iterator, and the data is actually acquired at the timing of calling the iterator.
If you specify KeyMarker
in the argument of filter, you can search from the middle, you can specify RequestPayer, etc. It seems that you can do almost everything you can do with list_objects_v2.
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
# a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
b = [k.key for k in a]
78733 Object → 33 seconds
Executed <function test at 0x10191f200> in 33.14992713928223 seconds
Since it is a write-down code, there are some suitable parts.
import os
from pathlib import Path
from typing import Optional
import boto3
from dataclasses import dataclass
from lauda import stopwatch
@dataclass
class S3Manager:
source_bucket: str
source_prefix: str
profile: Optional[str] = None
def _session(self):
s = boto3.session.Session(
profile_name=self.profile
)
return s
def _list_source(self, *, accumulated=None, next_token=None, func=None):
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
if func:
return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)
def _accumulate(self, *, response, keys, func, next_token, accumulated):
got_keys = (accumulated or []) + keys
if next_token:
print(f'searching... current fetch keys are :{len(got_keys)}')
return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
else:
return got_keys
def list_all(self) -> list:
return self._list_source(func=self._accumulate)
def _delete(self, *, response, keys, func, next_token, accumulated):
if keys:
print(f'deleting: {self.source_bucket}/{self.source_prefix}')
s3client = boto3.Session().client('s3')
s3client.delete_objects(
Bucket=self.source_bucket,
Delete={
'Objects': [{'Key': key} for key in keys],
'Quiet': False
},
)
if next_token:
return self._list_source(next_token=next_token, func=func)
def delete_all(self) -> None:
self._list_source(func=self._delete)
def list_all_test(self):
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
b = [k.key for k in a]
print(len(b))
if __name__ == '__main__':
os.chdir(Path(__file__).parents[1])
@stopwatch
def test():
s3 = S3Manager(
source_bucket='bucket',
source_prefix='Path to search',
)
# s3.list_all()
s3.list_all_test()
test()
The lower level API is passing a function for extensibility, although there may be some overhead. The high-level API is not slow, and the description is easy, so let's use the high-level API.
Recommended Posts