This article summarizes the Transforms provided by the Apache Beam Python SDK. By knowing all the Transforms that can be called easily, I think that you can plan the implementation more quickly.
Consider each element of the PCollection and perform some processing (DoFn).
test_par_do.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class ComputeWordLength(beam.DoFn):
def __init__(self):
super(ComputeWordLength, self).__init__()
def process(self, element):
yield len(element)
class TestParDo(TestCase):
def test_par_do(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ParDo(ComputeWordLength()))
assert_that(actual, equal_to(expected))
Filters the elements of the PCollection.
test_filter.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFilter(TestCase):
def test_filter(self):
expected = ['A']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Filter(lambda element: element.startswith('A')))
assert_that(actual, equal_to(expected))
Apply a function to each element of the PCollection.
test_map.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMap(TestCase):
def test_map(self):
expected = [5, 3, 7, 7, 5]
inputs = ['Alice', 'Bob', 'Cameron', 'Daniele', 'Ellen']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Map(lambda element: len(element)))
assert_that(actual, equal_to(expected))
Apply a function to each element of the PCollection.
test_flat_map.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatMap(TestCase):
def test_flat_map(self):
expected = [5, 3, 7, 7, 5]
inputs = [['Alice', 'Bob'], ['Cameron', 'Daniele', 'Ellen']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.FlatMap(lambda element: [len(e) for e in element]))
assert_that(actual, equal_to(expected))
Converts each element of the PCollection to a string.
test_to_string.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToString(TestCase):
def test_to_string_kvs(self):
"""Key,Value,To the delimiter string."""
expected = ['A,B', 'C,D']
inputs = [('A', 'B'), ('C', 'D')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Kvs())
assert_that(actual, equal_to(expected))
def test_to_string_element(self):
"""Each element into a string."""
expected = ["A", "['A', 'B']", "['C', 'D', 'E']"]
inputs = ['A', ['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Element())
assert_that(actual, equal_to(expected))
def test_to_string_iterables(self):
"""Turn iterable objects into strings."""
expected = ['A,B', 'C,D,E']
inputs = [['A', 'B'], ['C', 'D', 'E']]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.ToString.Iterables())
assert_that(actual, equal_to(expected))
Extract the Key from each element of the PCollection (Key / Value pair).
test_keys.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKeys(TestCase):
def test_keys(self):
expected = [0, 1, 2, 3, 4, 5, 6]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Keys())
assert_that(actual, equal_to(expected))
Extracts a Value from each element of the PCollection (Key / Value pair).
test_values.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestValues(TestCase):
def test_values(self):
expected = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Values())
assert_that(actual, equal_to(expected))
Swap the Key and Value values of each element of the PCollection (Key and Value pair).
test_kv_swap.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestKvSwap(TestCase):
def test_kv_swap(self):
expected = [('Friday', 5), ('Monday', 1), ('Saturday', 6), ('Sunday', 0),
('Thursday', 4), ('Tuesday', 2), ('Wednesday', 3)]
inputs = [(0, 'Sunday'), (1, 'Monday'), (2, 'Tuesday'), (3, 'Wednesday'),
(4, 'Thursday'), (5, 'Friday'), (6, 'Saturday')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.KvSwap())
assert_that(actual, equal_to(expected))
Aggregate the elements of the PCollection (Key / Value pairs) by Key.
test_group_by_key.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestGroupByKey(TestCase):
def test_group_by_key(self):
expected = [('cat', ['tama', 'mike']), ('dog', ['pochi'])]
inputs = [('cat', 'tama'), ('cat', 'mike'), ('dog', 'pochi')]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.GroupByKey())
assert_that(actual, equal_to(expected))
Aggregate elements of multiple PCollections (Key / Value pairs) by Key.
test_co_group_by_key.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCoGroupByKey(TestCase):
def test_co_group_by_key(self):
expected = [
('amy', (['[email protected]'], ['111-222-3333', '333-444-5555'])),
('julia', (['[email protected]'], []))
]
inputs1 = [('amy', '[email protected]'), ('julia', '[email protected]')]
inputs2 = [('amy', '111-222-3333'), ('amy', '333-444-5555')]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = ((pcol1, pcol2)
| beam.CoGroupByKey())
assert_that(actual, equal_to(expected))
Combines all the elements of a PCollection.
combine_globally.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCombineGlobally(TestCase):
def test_combine_globally(self):
expected = [55]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.CombineGlobally(sum))
assert_that(actual, equal_to(expected))
Stores all the elements of the PCollection in one list.
test_to_list.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToList(TestCase):
def test_to_list(self):
expected = [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToList())
assert_that(actual, equal_to(expected))
Stores all elements of the PCollection (Key / Value pairs) in one dictionary type.
test_to_dict.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestToDict(TestCase):
def test_to_dict(self):
expected = [{'A': 2, 'B': 1}] #If the Key is covered, one of the Values is selected.
inputs = [('A', 1), ('A', 2), ('B', 1)]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.ToDict())
assert_that(actual, equal_to(expected))
Count the number of elements in the PCollection.
test_count.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestCount(TestCase):
def test_count(self):
expected = [10]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Count.Globally())
assert_that(actual, equal_to(expected))
Eliminate duplicates from the elements of the PCollection.
test_distinct.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestDistinct(TestCase):
def test_distinct(self):
expected = [1, 2, 3]
inputs = [1, 1, 2, 3]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Distinct())
assert_that(actual, equal_to(expected))
Calculates the average of all elements in the PCollection.
test_mean.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestMean(TestCase):
def test_mean(self):
expected = [5.5]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Mean.Globally())
assert_that(actual, equal_to(expected))
Randomly extract a few from all the elements of the PCollection.
test_sample.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestSample(TestCase):
def test_sample(self):
expected = [[2, 8, 6]] #Expected value will be a random value every time
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Sample.FixedSizeGlobally(3))
assert_that(actual, equal_to(expected))
Extract a few of the largest (or smallest) of all the elements in the PCollection.
test_top.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestTop(TestCase):
def test_top_largest(self):
expected = [[10, 9, 8]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Largest(3))
assert_that(actual, equal_to(expected))
def test_top_smallest(self):
expected = [[1, 2, 3]]
inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.combiners.Top.Smallest(3))
assert_that(actual, equal_to(expected))
Combine multiple PCollections into a single PCollection.
test_flatten.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestFlatten(TestCase):
def test_flatten(self):
expected = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
inputs1 = [1, 2, 3, 4, 5]
inputs2 = [6, 7, 8, 9, 10]
with TestPipeline() as p:
pcol1 = p | 'create pcol1' >> beam.Create(inputs1)
pcol2 = p | 'create pcol2' >> beam.Create(inputs2)
actual = (pcol1, pcol2) | beam.Flatten()
assert_that(actual, equal_to(expected))
Redistributes the elements of the PCollection among the workers.
test_reshuffle.py
from unittest import TestCase
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class TestReshuffle(TestCase):
def test_reshuffle(self):
expected = ['A', 'B', 'C']
inputs = ['A', 'B', 'C']
with TestPipeline() as p:
actual = (p
| beam.Create(inputs)
| beam.Reshuffle())
assert_that(actual, equal_to(expected))
The Apache Beam Python SDK provides a wealth of Transforms (although less than Java). I would like to update it as new features are provided.
I hope you can refer to it when you want to remember the Transform of Apache Beam.
Recommended Posts