[PYTHON] Lors de la sortie d'une liste de S3 avec Boto3, utilisez Bucket (). Objects.filter au lieu de list_objects_v2.

API de bas niveau et API de haut niveau

Boto3 est la bibliothèque python d'aws, mais elle possède une API de bas niveau qui est une API naïve et une API de haut niveau orientée objet qui l'encapsule.

Manipulation d'objets S3 avec Boto3 (API de haut niveau et API de bas niveau) --Qiita https://qiita.com/sokutou-metsu/items/5ba7531117224ee5e8af

Jusqu'à présent, client.list_objects_v2, qui est une API de bas niveau, était utilisé pour afficher la liste de S3, mais resource.Bucket (). Objects.filter existe en tant qu'API de haut niveau correspondante. (Je ne pouvais pas me retrouver car le matériel de s3 était trop énorme)

C'est un article que la quantité de description est réduite et la vitesse est augmentée en utilisant l'API de haut niveau, alors utilisons l'API de haut niveau.

API de bas niveau

Utilisez la nouvelle version de l'API S3 ListObjects, ListObjects V2 | Developers.IO https://dev.classmethod.jp/cloud/aws/s3-new-api-list-object-v2/

Dans list_objects_v2, 1000 éléments sont récupérés à la fois. Ceci est un exemple car il nécessite un traitement de pagination. (Appel de cette description récursivement)

        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

la vitesse

78733 Objet → 46 secondes Executed <function test at 0x10c0743b0> in 46.35232996940613 seconds

API de haut niveau

Bucket (). Objects est de type ObjectSummary et les attributs sont spécifiés ici par le filtre de chaînage, all, limit, page_size, etc. La valeur de retour est également ObjectSummary https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Bucket.objects https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.ObjectSummary

L'ObjectSummary lui-même est un itérateur et les données sont en fait acquises au moment de l'appel de l'itérateur. Si vous spécifiez KeyMarker dans l'argument de filter, vous pouvez rechercher à partir du milieu, vous pouvez spécifier RequestPayer, etc. Il semble que vous puissiez faire presque tout ce que vous pouvez faire avec list_objects_v2.

        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
#        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix).limit(count=2000)
        b = [k.key for k in a]

la vitesse

78733 Objet → 33 secondes Executed <function test at 0x10191f200> in 33.14992713928223 seconds

Code source complet

Puisqu'il s'agit d'un code d'écriture, il existe des parties appropriées

import os
from pathlib import Path
from typing import Optional

import boto3
from dataclasses import dataclass
from lauda import stopwatch


@dataclass
class S3Manager:
    source_bucket: str
    source_prefix: str
    profile: Optional[str] = None

    def _session(self):
        s = boto3.session.Session(
            profile_name=self.profile
        )
        return s

    def _list_source(self, *, accumulated=None, next_token=None, func=None):
        s3client = self._session().client('s3')
        if next_token:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
                ContinuationToken=next_token,
            )
        else:
            response = s3client.list_objects_v2(
                Bucket=self.source_bucket,
                Prefix=self.source_prefix,
            )

        if 'Contents' in response:
            keys = [i['Key'] for i in response['Contents']]
        else:
            keys = []

        if 'NextContinuationToken' in response:
            next_token = response['NextContinuationToken']
        else:
            next_token = None

        if func:
            return func(response=response, keys=keys, func=func, next_token=next_token, accumulated=accumulated)

    def _accumulate(self, *, response, keys, func, next_token, accumulated):
        got_keys = (accumulated or []) + keys
        if next_token:
            print(f'searching... current fetch keys are :{len(got_keys)}')
            return self._list_source(accumulated=got_keys, next_token=next_token, func=func)
        else:
            return got_keys

    def list_all(self) -> list:
        return self._list_source(func=self._accumulate)

    def _delete(self, *, response, keys, func, next_token, accumulated):
        if keys:
            print(f'deleting: {self.source_bucket}/{self.source_prefix}')
            s3client = boto3.Session().client('s3')
            s3client.delete_objects(
                Bucket=self.source_bucket,
                Delete={
                    'Objects': [{'Key': key} for key in keys],
                    'Quiet': False
                },
            )

        if next_token:
            return self._list_source(next_token=next_token, func=func)

    def delete_all(self) -> None:
        self._list_source(func=self._delete)

    def list_all_test(self):
        s3_resource = self._session().resource('s3')
        a = s3_resource.Bucket(self.source_bucket).objects.filter(Prefix=self.source_prefix)
        b = [k.key for k in a]
        print(len(b))


if __name__ == '__main__':
    os.chdir(Path(__file__).parents[1])

    @stopwatch
    def test():
        s3 = S3Manager(
            source_bucket='seau',
            source_prefix='Chemin de recherche',
        )
        # s3.list_all()
        s3.list_all_test()

    test()

Résumé

L'API de niveau inférieur transmet la fonction d'extensibilité, bien qu'il puisse y avoir une surcharge. L'API de haut niveau n'est pas lente et la description est facile, alors utilisons l'API de haut niveau.

Recommended Posts

Lors de la sortie d'une liste de S3 avec Boto3, utilisez Bucket (). Objects.filter au lieu de list_objects_v2.
Utilisez boto3 pour accéder à S3
Comment gérer l'erreur SSL lors de la connexion à S3 avec Python boto
Obtenir une liste d'utilisateurs IAM avec Boto3
[Python] Résumé des opérations sur les fichiers S3 avec boto3
[Memo] Chargez le csv de s3 dans les pandas avec boto3
EP 7 Utiliser les compréhensions de liste au lieu de carte et de filtre
Téléchargeur S3 avec boto
Opération S3 avec python boto3