Touch Apache Beam in Python

Touch Apache Beam with Python 2.7

Apache Beam is the framework that Google uses in its cloud service Dataflow. The selling point is that both batch processing and streaming processing can be written in the same way. This time I wrote a minimum program in Python and executed it.

Apache Beam installation prerequisites

--The premise is Python 2.7. --For those who have only Python 3 system or who do not use Python originally, prepare a Python environment. ――I only had a Python 3 conda environment, so I created a python 2.7 environment with conda install. Reference: Switch between Python2 and Python3 and use jupyter notebook --The installation method using Virtualenv is as per the Beam original document. Apache Beam Python SDK Quickstart --GCP documentation if you use Google's GCP. Quick Start with Python

Install Apache Beam

--After entering the Python 2.7 environment, execute the following. ――By the way, in Python 3.5, cython gives an error and ends. ――In my environment

pip install apache-beam

Try writing a program like the one below

--I read the programming guide for Python diagonally and copied and pasted about 3 sources to make the following.

  1. Enter 4 lines of Shakespeare
  2. Count the number of characters in each line with the len function
  3. Ends by outputting the number of characters to standard output (originally it ends by writing to DB or something, but this time it is omitted)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

#First make a pipeline
p = beam.Pipeline(options=PipelineOptions())

# 1.Set array to pipeline input (4 rows as input)
lines = (p
       | beam.Create([
           'To be, or not to be: that is the question: ',
           'Whether \'tis nobler in the mind to suffer ',
           'The slings and arrows of outrageous fortune, ',
           'Or to take arms against a sea of troubles, ']))

# 2.Set the character string count of each line as the conversion process
word_lengths = lines | beam.Map(len)

# 3.Finally, output the count number to the standard output and finish
class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    print(element)
p_end = word_lengths | beam.ParDo(ExtractWordsFn())
p.run()

--Results of running on Jupyter -Published to Gist

<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>
43
42
45
43

Summary

――It's easy, but for the time being, I was able to execute Apache Beam batch processing from Python. It takes about an hour from installation to implementation and execution. ――I want to execute streaming processing in the future. Also, it seems that Spark Streaming etc. can be used as an engine, so I would like to try that as well. ――I was happy to be able to execute the above from Bash on windows + Jupyter on Windows.

Other references

Recommended Posts

Touch Apache Beam in Python
Apache Beam Cheat Sheet [Python]
Quadtree in Python --2
CURL in python
Metaprogramming in Python
Python 3.3 in Anaconda
Geocoding in python
SendKeys in Python
Meta-analysis in Python
Unittest in python
Discord in Python
DCI in Python
quicksort in python
nCr in python
N-Gram in Python
Programming in python
Plink in Python
Constant in python
Lifegame in Python.
Sqlite in python
StepAIC in Python
N-gram in python
LINE-Bot [0] in Python
Csv in python
Disassemble in Python
Reflection in Python
Constant in python
nCr in Python.
format in python
Scons in Python3
Puyo Puyo in python
python in virtualenv
PPAP in Python
Quad-tree in Python
Reflection in Python
Chemistry in Python
Hashable in python
DirectLiNGAM in Python
LiNGAM in Python
Flatten in python
flatten in python
Try installing GeoSpark (Apache Sedona) in Python environment
Sorted list in Python
Daily AtCoder # 36 in Python
Clustering text in Python
Daily AtCoder # 2 in Python
Implement Enigma in python
Daily AtCoder # 32 in Python
Daily AtCoder # 6 in Python
Edit fonts in Python
Singleton pattern in Python
File operations in Python
Read DXF in python
Daily AtCoder # 53 in Python
Touch MySQL from Python 3
Key input in Python
Use config.ini in Python
Daily AtCoder # 33 in Python
Solve ABC168D in Python
Logistic distribution in Python
Daily AtCoder # 7 in Python