https://beam.apache.org/documentation/transforms/python/elementwise/pardo/
[Jetez un œil à ceci.
ParDo est une fonction de traitement à usage général pour Tranform en parallèle.
Lorsqu'une PCollection, qui est un bloc de données, est entrée, ParDo applique un traitement arbitraire implémenté par l'utilisateur à chaque élément qu'il contient et convertit le résultat du traitement en une PCollection de sortie.
L'utilisation est comme ça.
ParDo1.py
class SplitWords(beam.DoFn):
def process(self, element):
return [element.split()]
with beam.Pipeline() as p:
len = (
p
| 'Create Data' >> beam.Create(['Cloud Dataflow is a distributed parallel processing service.',
'BigQuery is a powerful Data WareHouse.',
'Cloud Pub/Sub is a scalable messaging service.'])
| 'Split into words' >> beam.ParDo(SplitWords())
| 'Print' >> beam.Map(print)
)
Le processus à effectuer par ParDo est implémenté dans la classe qui hérite de beam.DoFn
.
Écrivez le processus réel dans la méthode process
.
Puisque «élément» pris comme argument devient chaque enregistrement de PCollection, split () est appelé ici pour cette chaîne de caractères.
Enfin, return renvoie le résultat du traitement sous forme de tableau.
yield element.split()
Peut être retourné avec
La sortie ressemble à ceci.
['Cloud', 'Dataflow', 'is', 'a', 'distributed', 'parallel', 'processing', 'service.']
['BigQuery', 'is', 'a', 'powerful', 'Data', 'WareHouse.']
['Cloud', 'Pub/Sub', 'is', 'a', 'scalable', 'messaging', 'service.']
DoFn.setup()
DoFn
Appelé une fois par instance lorsque l'instance est initialisée. Il peut être appelé plus d'une fois par travailleur. Il est bon de faire le traitement de connexion de la base de données et du réseau ici.
DoFn.start_bundle()
Appelé une fois pour des morceaux d'éléments. Appelé avant que process
ne soit appelé sur le premier élément
. Bon pour suivre le début du traitement de morceaux d'éléments.
DoFn.process(element, *args, **kwargs)
Appelé pour chaque «élément». Produit 0 «élément» ou plus. Vous pouvez prendre * args
et ** kwargs
comme arguments via les arguments ParDo.
DoFn.finish_bundle()
Appelé une fois pour des morceaux d'éléments. Appelé après que process
soit appelé sur le dernier élément
. Produit 0 «élément» ou plus. Un bon endroit pour exécuter un lot à la fin d'un morceau, comme exécuter une requête de base de données.
Par exemple, initialisez un lot avec start_bundle
, ajoutez un élément au lot au lieu de renvoyer ou rendement avec process
, et enfin exécutez une requête avec finish_bundle
pour afficher le résultat. Comment utiliser.
DoFn.teardown()
DoFn
Appelé une fois par instance lorsque l'instance se termine. (Cependant, cela semble être le meilleur effort. Cela signifie qu'il ne sera pas exécuté si le worker plante.) Notez que. Idéal pour fermer la base de données et les connexions réseau.
Des informations telles que l'heure (appelée event_time) de l'élément et l'heure de début et l'heure de fin de la fenêtre peuvent être obtenues comme suit.
class AnalyzeElement(beam.DoFn):
def process(self, elem,
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):
yield '\n'.join(['# timestamp',
'type(timestamp) -> ' + repr(type(timestamp)),
'timestamp.micros -> ' + repr(timestamp.micros),
'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
'',
'# window',
'type(window) -> ' + repr(type(window)),
'window.start -> {} ({})'.format(
window.start, window.start.to_utc_datetime()),
'window.end -> {} ({})'.format(
window.end, window.end.to_utc_datetime()),
'window.max_timestamp() -> {} ({})'.format(
window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
])
Recommended Posts