Dieser Artikel fasst die vom Apache Beam Python SDK bereitgestellten Transformationen zusammen. Wenn Sie alle Transformationen kennen, die einfach aufgerufen werden können, können Sie die Implementierung meiner Meinung nach schneller planen.
Betrachten Sie jedes Element der PCollection und führen Sie eine Verarbeitung (DoFn) durch.
test_par_do.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class ComputeWordLength(beam.DoFn):
def __init__(self):
super(ComputeWordLength, self).__init__()
def process(self, element):
yield len(element)
class TestParDo(TestCase):
def test_par_do(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
Filtern Sie die Elemente der PCollection.
test_filter.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFilter(TestCase):
def test_filter(self):
expected = ['A']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Filter(lambda element: element.startswith('A')))
assert_that(actual, equal_to(expected))
Wenden Sie die Funktion auf jedes Element der PCollection an.
test_map.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMap(TestCase):
def test_map(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Map(lambda element: len(element)))
assert_that(actual, equal_to(expected))
Wenden Sie die Funktion auf jedes Element der PCollection an.
test_flat_map.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatMap(TestCase):
def test_flat_map(self):
expected = [5, 3, 7, 7, 5]
inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.FlatMap(lambda element: [len(e) for e in element]))
assert_that(actual, equal_to(expected))
Konvertiert jedes Element der PCollection in eine Zeichenfolge.
test_to_string.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToString(TestCase):
def test_to_string_kvs(self):
"""Key,Wert,Zur Trennzeichenfolge."""
expected = ['A,B', 'C,D']
inputs = [('A', 'B'), ('C', 'D')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Kvs())
assert_that(actual, equal_to(expected))
def test_to_string_element(self):
"""Jedes Element in eine Zeichenfolge."""
expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]
inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Element())
assert_that(actual, equal_to(expected))
def test_to_string_iterables(self):
"""Verwandeln Sie iterierbare Objekte in Zeichenfolgen."""
expected = ['A,B', 'C,D,E']
inputs = [['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Iterables())
assert_that(actual, equal_to(expected))
Extrahieren Sie den Schlüssel aus jedem Element der PCollection (Schlüssel / Wert-Paar).
test_keys.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKeys(TestCase):
def test_keys(self):
expected = [0, 1, 2, 3, 4, 5, 6]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Keys())
assert_that(actual, equal_to(expected))
Extrahiert einen Wert aus jedem Element der PCollection (Schlüssel / Wert-Paar).
test_values.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestValues(TestCase):
def test_values(self):
expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Values())
assert_that(actual, equal_to(expected))
Tauschen Sie die Schlüssel- und Wertwerte jedes Elements der PCollection (Schlüssel- und Wertepaar) aus.
test_kv_swap.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKvSwap(TestCase):
def test_kv_swap(self):
expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.KvSwap())
assert_that(actual, equal_to(expected))
Aggregieren Sie die Elemente einer PC-Sammlung (Schlüssel / Wert-Paare) nach Schlüssel.
test_group_by_key.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestGroupByKey(TestCase):
def test_group_by_key(self):
expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]
inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.GroupByKey())
assert_that(actual, equal_to(expected))
Aggregieren Sie Elemente mehrerer PC-Sammlungen (Schlüssel / Wert-Paare) nach Schlüssel.
test_co_group_by_key.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCoGroupByKey(TestCase):
def test_co_group_by_key(self):
expected = [
('amy', (['[email protected]'], ['111-222-3333', '333-444-5555'])),
('julia', (['[email protected]'], []))
]
inputs1 = [('amy', '[email protected]'), ('julia', '[email protected]')]
inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = ((pcol1, pcol2)
| beam.CoGroupByKey())
assert_that(actual, equal_to(expected))
Kombiniert alle Elemente einer PCollection.
combine_globally.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCombineGlobally(TestCase):
def test_combine_globally(self):
expected = [55]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.CombineGlobally(sum))
assert_that(actual, equal_to(expected))
Speichert alle Elemente der PCollection in einer Liste.
test_to_list.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToList(TestCase):
def test_to_list(self):
expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToList())
assert_that(actual, equal_to(expected))
Speichert alle Elemente der PCollection (Schlüssel / Wert-Paare) in einem Wörterbuchtyp.
test_to_dict.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToDict(TestCase):
def test_to_dict(self):
expected = [{'A': 2, 'B': 1}] #Wenn der Schlüssel abgedeckt ist, wird einer der beiden Werte ausgewählt
inputs = [('A', 1), ('A', 2), ('B', 1)]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToDict())
assert_that(actual, equal_to(expected))
Zählen Sie die Anzahl der Elemente in der PCollection.
test_count.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCount(TestCase):
def test_count(self):
expected = [10]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Count.Globally())
assert_that(actual, equal_to(expected))
Beseitigen Sie Duplikate aus den Elementen der PCollection.
test_distinct.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestDistinct(TestCase):
def test_distinct(self):
expected = [1, 2, 3]
inputs = [1, 1, 2, 3]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Distinct())
assert_that(actual, equal_to(expected))
Berechnet den Durchschnitt aller Elemente in der PCollection.
test_mean.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMean(TestCase):
def test_mean(self):
expected = [5.5]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Mean.Globally())
assert_that(actual, equal_to(expected))
Extrahieren Sie zufällig einige aus allen Elementen der PC-Sammlung.
test_sample.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestSample(TestCase):
def test_sample(self):
expected = [[2, 8, 6]] #Der erwartete Wert ist jedes Mal ein zufälliger Wert
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Sample.FixedSizeGlobally(3))
assert_that(actual, equal_to(expected))
Extrahieren Sie einige der größten (oder kleinsten) aller Elemente in der PCollection.
test_top.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestTop(TestCase):
def test_top_largest(self):
expected = [[10, 9, 8]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Largest(3))
assert_that(actual, equal_to(expected))
def test_top_smallest(self):
expected = [[1, 2, 3]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Smallest(3))
assert_that(actual, equal_to(expected))
Kombinieren Sie mehrere PC-Sammlungen zu einer einzigen PC-Sammlung.
test_flatten.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatten(TestCase):
def test_flatten(self):
expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
inputs1 = [1, 2, 3, 4, 5]
inputs2 = [6, 7, 8, 9, 10]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = (pcol1, pcol2) | beam.Flatten()
assert_that(actual, equal_to(expected))
Verteilen Sie die Elemente der PC-Sammlung unter den Arbeitern.
test_reshuffle.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestReshuffle(TestCase):
def test_reshuffle(self):
expected = ['A', 'B', 'C']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Reshuffle())
assert_that(actual, equal_to(expected))
Das Apache Beam Python SDK bietet eine Fülle von Transformationen (obwohl weniger als Java). Ich möchte es aktualisieren, sobald neue Funktionen bereitgestellt werden.
Ich hoffe, Sie können sich darauf beziehen, wenn Sie sich an die Transformation von Apache Beam erinnern möchten.
Recommended Posts