Boto3 ist die Python-Bibliothek von aws, verfügt jedoch über eine Low-Level-API, die eine naive API ist, und eine objektorientierte High-Level-API, die sie umschließt.
Bearbeiten von S3-Objekten mit Boto3 (High-Level-API und Low-Level-API) --Qiita https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af
Bisher wurde client.list_objects_v2, eine Low-Level-API, verwendet, um die Liste von S3 auszugeben, aber resource.Bucket (). Objects.filter ist als entsprechende High-Level-API vorhanden. (Ich konnte mich nicht finden, weil das Material von s3 zu groß war)
Es ist ein Artikel, in dem die Anzahl der Beschreibungen reduziert und die Geschwindigkeit durch die Verwendung der High-Level-API erhöht wird. Verwenden wir also die High-Level-API.
Verwenden Sie die neue Version der S3 ListObjects-API, ListObjects V2 | Developers.IO https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/
In list_objects_v2 werden 1000 Elemente gleichzeitig abgerufen. Dies ist ein Beispiel, da eine Paginierungsverarbeitung erforderlich ist. (Diese Beschreibung rekursiv aufrufen)
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
78733 Objekt → 46 Sekunden
Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds
Bucket (). Objects ist ein ObjectSummary-Typ, und Attribute werden hier durch Verketten von Filter, all, limit, page_size usw. angegeben. Der Rückgabewert ist auch ObjectSummary https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary
Die ObjectSummary selbst ist ein Iterator, und die Daten werden tatsächlich zum Zeitpunkt des Aufrufs des Iterators erfasst. Wenn Sie im Argument des Filters "KeyMarker" angeben, können Sie von der Mitte aus suchen, RequestPayer angeben usw. Es scheint, dass Sie fast alles tun können, was Sie mit list_objects_v2 tun können.
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
# a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
b = [k.key for k in a]
78733 Objekt → 33 Sekunden
Executed <function test at 0x10191f200> in 33.14992713928223 seconds
Da es sich um einen Abschreibungscode handelt, gibt es einige geeignete Teile
import os
from pathlib import Path
from typing import Optional
import boto3
from dataclasses import dataclass
from lauda import stopwatch
@dataclass
class S3Manager:
source_bucket: str
source_prefix: str
profile: Optional[str] = None
def _session(self):
s = boto3.session.Session(
profile_name=self.profile
)
return s
def _list_source(self, *, accumulated=None, next_token=None, func=None):
s3client = self._session().client('s3')
if next_token:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
ContinuationToken=next_token,
)
else:
response = s3client.list_objects_v2(
Bucket=self.source_bucket,
Prefix=self.source_prefix,
)
if 'Contents' in response:
keys = [i['Key'] for i in response['Contents']]
else:
keys = []
if 'NextContinuationToken' in response:
next_token = response['NextContinuationToken']
else:
next_token = None
if func:
return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)
def _accumulate(self, *, response, keys, func, next_token, accumulated):
got_keys = (accumulated or []) + keys
if next_token:
print(f'searching... current fetch keys are :{len(got_keys)}')
return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
else:
return got_keys
def list_all(self) -> list:
return self._list_source(func=self._accumulate)
def _delete(self, *, response, keys, func, next_token, accumulated):
if keys:
print(f'deleting: {self.source_bucket}/{self.source_prefix}')
s3client = boto3.Session().client('s3')
s3client.delete_objects(
Bucket=self.source_bucket,
Delete={
'Objects': [{'Key': key} for key in keys],
'Quiet': False
},
)
if next_token:
return self._list_source(next_token=next_token, func=func)
def delete_all(self) -> None:
self._list_source(func=self._delete)
def list_all_test(self):
s3_resource = self._session().resource('s3')
a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
b = [k.key for k in a]
print(len(b))
if __name__ == '__main__':
os.chdir(Path(__file__).parents[1])
@stopwatch
def test():
s3 = S3Manager(
source_bucket='Eimer',
source_prefix='Pfad zur Suche',
)
# s3.list_all()
s3.list_all_test()
test()
Die untergeordnete API übergibt die Funktion aus Gründen der Erweiterbarkeit, obwohl möglicherweise ein gewisser Overhead anfällt. Die High-Level-API ist nicht langsam und die Beschreibung ist einfach. Verwenden wir also die High-Level-API.
Recommended Posts