[GO] Apache Beam Cheet Sheet [Python]

Einführung

Dieser Artikel fasst die vom Apache Beam Python SDK bereitgestellten Transformationen zusammen. Wenn Sie alle Transformationen kennen, die einfach aufgerufen werden können, können Sie die Implementierung meiner Meinung nach schneller planen.

Elementweise Verarbeitung | Elementweise

ParDo-Run DoFn

Betrachten Sie jedes Element der PCollection und führen Sie eine Verarbeitung (DoFn) durch.

test_par_do.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class ComputeWordLength(beam.DoFn):

    def __init__(self):
        super(ComputeWordLength, self).__init__()

    def process(self, element):
        yield len(element)


class TestParDo(TestCase):

    def test_par_do(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ParDo(ComputeWordLength()))

            assert_that(actual, equal_to(expected))

Filter - Elementfilterung

Filtern Sie die Elemente der PCollection.

test_filter.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFilter(TestCase):

    def test_filter(self):
        expected = ['A']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Filter(lambda element: element.startswith('A')))

            assert_that(actual, equal_to(expected))

Map-Apply-Funktion auf Element

Wenden Sie die Funktion auf jedes Element der PCollection an.

test_map.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMap(TestCase):

    def test_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Map(lambda element: len(element)))

            assert_that(actual, equal_to(expected))

FlatMap-Apply-Funktion auf Element anwenden (wiederholt)

Wenden Sie die Funktion auf jedes Element der PCollection an.

test_flat_map.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatMap(TestCase):

    def test_flat_map(self):
        expected = [5, 3, 7, 7, 5]

        inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.FlatMap(lambda element: [len(e) for e in element]))

            assert_that(actual, equal_to(expected))

ToString - Konvertiert das Element in eine Zeichenfolge

Konvertiert jedes Element der PCollection in eine Zeichenfolge.

test_to_string.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToString(TestCase):

    def test_to_string_kvs(self):
        """Key,Wert,Zur Trennzeichenfolge."""
        expected = ['A,B', 'C,D']

        inputs = [('A', 'B'), ('C', 'D')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Kvs())

            assert_that(actual, equal_to(expected))

    def test_to_string_element(self):
        """Jedes Element in eine Zeichenfolge."""
        expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]

        inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Element())

            assert_that(actual, equal_to(expected))

    def test_to_string_iterables(self):
        """Verwandeln Sie iterierbare Objekte in Zeichenfolgen."""
        expected = ['A,B', 'C,D,E']

        inputs = [['A', 'B'], ['C', 'D', 'E']]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.ToString.Iterables())

            assert_that(actual, equal_to(expected))

Schlüssel - Schlüssel aus Elementen extrahieren

Extrahieren Sie den Schlüssel aus jedem Element der PCollection (Schlüssel / Wert-Paar).

test_keys.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKeys(TestCase):

    def test_keys(self):
        expected = [0, 1, 2, 3, 4, 5, 6]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Keys())

            assert_that(actual, equal_to(expected))

Werte - Wert aus Element extrahieren

Extrahiert einen Wert aus jedem Element der PCollection (Schlüssel / Wert-Paar).

test_values.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestValues(TestCase):

    def test_values(self):
        expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Values())

            assert_that(actual, equal_to(expected))

KvSwap-Swap-Element Schlüssel und Werte

Tauschen Sie die Schlüssel- und Wertwerte jedes Elements der PCollection (Schlüssel- und Wertepaar) aus.

test_kv_swap.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestKvSwap(TestCase):

    def test_kv_swap(self):
        expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
                    ('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]

        inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
                  (4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.KvSwap())

            assert_that(actual, equal_to(expected))

Aggregation | Aggregation

GroupByKey - Elemente nach Schlüssel sortieren

Aggregieren Sie die Elemente einer PC-Sammlung (Schlüssel / Wert-Paare) nach Schlüssel.

test_group_by_key.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestGroupByKey(TestCase):

    def test_group_by_key(self):
        expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]

        inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.GroupByKey())

            assert_that(actual, equal_to(expected))

CoGroupByKey - Elemente nach Schlüssel sortieren (mehrere PC-Sammlungen)

Aggregieren Sie Elemente mehrerer PC-Sammlungen (Schlüssel / Wert-Paare) nach Schlüssel.

test_co_group_by_key.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCoGroupByKey(TestCase):

    def test_co_group_by_key(self):
        expected = [
            ('amy', (['[email protected]'], ['111-222-3333', '333-444-5555'])),
            ('julia', (['[email protected]'], []))
        ]

        inputs1 = [('amy', '[email protected]'), ('julia', '[email protected]')]
        inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = ((pcol1, pcol2)
                      | beam.CoGroupByKey())

            assert_that(actual, equal_to(expected))

CombineGlobally-Combine-Elemente

Kombiniert alle Elemente einer PCollection.

combine_globally.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCombineGlobally(TestCase):

    def test_combine_globally(self):
        expected = [55]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.CombineGlobally(sum))

            assert_that(actual, equal_to(expected))

ToList-Store-Elemente in einer Liste

Speichert alle Elemente der PCollection in einer Liste.

test_to_list.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToList(TestCase):

    def test_to_list(self):
        expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToList())

            assert_that(actual, equal_to(expected))

ToDict-Store-Elemente in einem Wörterbuchtyp

Speichert alle Elemente der PCollection (Schlüssel / Wert-Paare) in einem Wörterbuchtyp.

test_to_dict.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestToDict(TestCase):

    def test_to_dict(self):
        expected = [{'A': 2, 'B': 1}]  #Wenn der Schlüssel abgedeckt ist, wird einer der beiden Werte ausgewählt

        inputs = [('A', 1), ('A', 2), ('B', 1)]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.ToDict())

            assert_that(actual, equal_to(expected))

Count-Count die Anzahl der Elemente

Zählen Sie die Anzahl der Elemente in der PCollection.

test_count.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestCount(TestCase):

    def test_count(self):
        expected = [10]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Count.Globally())

            assert_that(actual, equal_to(expected))

Distinct-Element-Deduplizierung

Beseitigen Sie Duplikate aus den Elementen der PCollection.

test_distinct.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestDistinct(TestCase):

    def test_distinct(self):
        expected = [1, 2, 3]

        inputs = [1, 1, 2, 3]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Distinct())

            assert_that(actual, equal_to(expected))

Mittelwert - Elementmittelwert berechnen

Berechnet den Durchschnitt aller Elemente in der PCollection.

test_mean.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestMean(TestCase):

    def test_mean(self):
        expected = [5.5]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Mean.Globally())

            assert_that(actual, equal_to(expected))

Beispiel - Zufällig aus Elementen extrahiert

Extrahieren Sie zufällig einige aus allen Elementen der PC-Sammlung.

test_sample.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestSample(TestCase):

    def test_sample(self):
        expected = [[2, 8, 6]]  #Der erwartete Wert ist jedes Mal ein zufälliger Wert

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Sample.FixedSizeGlobally(3))

            assert_that(actual, equal_to(expected))

Top-Extrahieren von Maximal- (oder Minimal-) Werten aus Elementen

Extrahieren Sie einige der größten (oder kleinsten) aller Elemente in der PCollection.

test_top.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestTop(TestCase):

    def test_top_largest(self):
        expected = [[10, 9, 8]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Largest(3))

            assert_that(actual, equal_to(expected))

    def test_top_smallest(self):
        expected = [[1, 2, 3]]

        inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.combiners.Top.Smallest(3))

            assert_that(actual, equal_to(expected))

Andere Verarbeitung | Andere

Flache-Join PCollection

Kombinieren Sie mehrere PC-Sammlungen zu einer einzigen PC-Sammlung.

test_flatten.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestFlatten(TestCase):

    def test_flatten(self):
        expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

        inputs1 = [1, 2, 3, 4, 5]
        inputs2 = [6, 7, 8, 9, 10]

        with TestPipeline() as p:
            pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
            pcol2 = p | 'create pcol2' >> beam.Create(inputs2)

            actual = (pcol1, pcol2) | beam.Flatten()

            assert_that(actual, equal_to(expected))

Umbildung-Umverteilung von Elementen

Verteilen Sie die Elemente der PC-Sammlung unter den Arbeitern.

test_reshuffle.py


from unittest import TestCase

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to


class TestReshuffle(TestCase):

    def test_reshuffle(self):
        expected = ['A', 'B', 'C']

        inputs = ['A', 'B', 'C']

        with TestPipeline() as p:
            actual = (p
                      | beam.Create(inputs)
                      | beam.Reshuffle())

            assert_that(actual, equal_to(expected))

Zusammenfassung

Das Apache Beam Python SDK bietet eine Fülle von Transformationen (obwohl weniger als Java). Ich möchte es aktualisieren, sobald neue Funktionen bereitgestellt werden.

Ich hoffe, Sie können sich darauf beziehen, wenn Sie sich an die Transformation von Apache Beam erinnern möchten.

Referenz-URL

Recommended Posts

Apache Beam Cheet Sheet [Python]
Python3 Spickzettel (Basic)
PySpark Cheet Sheet [Python]
Python-Spickzettel
[Python3] Standardeingabe [Cheet Sheet]
Data Science Cheet Sheet (Python)
Berühren Sie Apache Beam mit Python
Apache Beam (Datenfluss) Praktische Einführung [Python]
Python-Spickzettel (für C ++ erfahren)
Curry Spickzettel
AtCoder Spickzettel in Python (für mich)
Blender Python Mesh Datenzugriffsprüfblatt
Cheet Sheet (Python) des Mathematical Optimization Modeler (PuLP)
SQLite3 Spickzettel
pyenv Spickzettel
[Aktualisierung] Python Syntax Spickzettel für Java Shop
conda Befehl Spickzettel
PIL / Kissen Spickzettel
ps Befehl Spickzettel
Python-basierte PDF-Spickzettel
Tox Einstellungsdatei Spickzettel
numpy Speicher wiederverwenden Spickzettel
Apache mod_auth_tkt und Python AuthTkt
Python3 + Django ~ Mac ~ mit Apache
Slack API Anhänge Spickzettel
Apache auf Macports, Python 3.3 + mod_wsgi3.4 auf Nicht-Macports
Scikit lernen Algorithmus Spickzettel
Persönlicher Spickzettel von Google Test / Mock
CPS-Spickzettel (Continuous Delivery Style)
Machen Sie Apache Log CSV mit Python
Bis Python auf Apache läuft