Easily try Amazon EMR / Cloud Dataproc with Python [mrjob]

Introduction

I was wondering if I could easily create a MapReduce application in Python, which I'm used to, and run it on ** Amazon EMR **, but it was called mrjob. I learned about the Python framework. There are other options such as PySpark, but it is a framework with the impression that it has a lower learning cost and is easy to implement. So, in this article, I would like to describe how to run an application created with mrjob on Amazon EMR. I'll also try running it on GCP's ** Cloud Dataproc **.

mrjob overview

68747470733a2f2f6769746875622e636f6d2f59656c702f6d726a6f622f7261772f6d61737465722f646f63732f6c6f676f732f6c6f676f5f6d656469756d2e706e67.png

mrjob is a framework that allows you to write applications in Python that run in Hadoop clusters. ** You can easily run MapReduce applications locally or in the cloud without any Hadoop expertise. ** mrjob is running on Hadoop Streaming internally. What is Hadoop in the first place? In that case, the outline is described in this article, so please refer to it if you like.

MapReduce application

Normally, a MapReduce application splits the input dataset and performs ** Map processing **, ** (Shuffle processing) **, ** Reduce processing **. You can also use a process called ** Combine process ** that performs an intermediate aggregation before passing the output result of the Map process to the Reduce process.

The application implemented this time is a program that counts the number of times a word appears in a document. It is expected to receive the following input.

input.txt


We are the world, we are the children
We are the ones who make a brighter day

Map processing

Takes key-value pairs as input line by line and returns zero or more key-value pairs.

Input


Input: (None, "we are the world, we are the children")

Output


Output:

"we", 1
"are", 1
"the", 1
"world", 1
"we", 1
"are", 1
"the", 1
"children", 1

Combine processing

It takes a line-by-line list of keys and their values as input and returns zero or more key-value pairs.

Input


Input: ("we", [1, 1])

Output


Output:

"we", 2

Reduce processing

It takes a list of keys and their values as input and returns zero or more key-value pairs as output.

Input


Input: ("we", [2, 1])  #1st line"we"2 times, 2nd line"we"Appears once

Output


Output:

"we", 3

Environment

Build an environment for creating MapReduce applications in Python.

installation of mrjob

You can install it from PyPI. This time I'm using version ** 0.7.1 **.

pip install mrjob==0.7.1

Implementation of MapReduce

Now, let's write the above process in mrjob. If the process is simple and can be described in one step, create a class that inherits ** mrjob.job.MRJob ** as follows.

mr_word_count.py


import re

from mrjob.job import MRJob

WORD_RE = re.compile(r"[\w']+")


class MRWordCount(MRJob):

    #Process the input line by line,(word, 1)Generate key value of
    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    #Takes a line-by-line key and a list of its values as input and totals
    def combiner(self, word, counts):
        yield word, sum(counts)

    #Take a list of keys and their values as input and sum them up
    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordCount.run()

If the process is more complicated and you need to process in multiple steps, define the process by passing ** mrjob.step.MRStep ** to MRJob's ** steps ** function as follows: can do.

mr_word_count.py


import re

from mrjob.job import MRJob
from mrjob.step import MRStep

WORD_RE = re.compile(r"[\w']+")


class MRWordCount(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(mapper=...,
                   combiner=...,
                   reducer=...),
            ...
        ]

    def mapper_get_words(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordCount.run()

Run MapReduce

Try running your application locally, in an Amazon EMR, or Cloud Dataproc environment. The package structure is like this.

Package configuration


.
├── config.yaml
├── input.txt
└── src
    ├── __init__.py
    └── mr_word_count.py

When running the application, you can specify options such as the input path. There are two options, ** pre-written in the Config file ** and ** passed from the console **. If the specified option is covered, ** the value passed from the console takes precedence. ** Config files can be defined in yaml or json format.

To run the application created by mrjob, enter the following from the console:

python {Application path} -r {Execution environment} -c {Config file path} < {Input path} > {Output path}

local

If you run it locally, you don't need to specify the execution environment or the path to the Config file if you don't need it. If you do not specify the output path, it will be output to the standard output.

python src/mr_word_count.py < input.txt

#If you need to pass the execution environment or the path of the Config file
python src/mr_word_count.py -r local -c config.yaml < input.txt

When you run the above command, the application will run and you will see results similar to the following in standard output:

"world" 1
"day"   1
"make"  1
"ones"  1
"the"   3
"we"    3
"who"   1
"brighter"      1
"children"      1
"a"     1
"are"   3

Amazon EMR To run on Amazon EMR, you must set ** AWS_ACCESS_KEY_ID ** and ** AWS_SECRET_ACCESS_KEY ** in your environment variables or in the Config file. You can also set the instance type and the number of cores. The application and its associated files are uploaded to S3 before they run.

config.yaml


runners:
  emr:
    aws_access_key_id: <your key ID>
    aws_secret_access_key: <your secret>
    instance_type: c1.medium
    num_core_instances: 4

Other options can be found here (https://mrjob.readthedocs.io/en/latest/guides/emr-opts.html).

When you run it, select ** emr ** for your environment.

python src/mr_word_count.py -r emr -c config.yaml input.txt --output-dir=s3://emr-test/output/

Cloud Dataproc To take advantage of Cloud Dataproc, enable the Dataproc API from GCP. Then, specify the path of the credential file in the environment variable ** GOOGLE_APPLICATION_CREDENTIALS **. In the Config file, set the zone, type, and number of cores of the instance. Another option is here (https://mrjob.readthedocs.io/en/latest/guides/dataproc-opts.html).

config.yaml


runners:
  dataproc:
    zone: us-central1-a
    instance_type: n1-standard-1
    num_core_instances: 2

When you run it, select ** dataproc ** for your environment.

python src/mr_word_count.py -r dataproc -c config.yaml input.txt --output-dir=gs://dataproc-test/output/

Summary

By using mrjob, I was able to easily create a MapReduce application in Python, and I was able to easily execute the created application in the cloud environment. In addition, mrjob has abundant documents, so it is easy to get started and is very easy to get started if you want to easily execute MapReduce jobs in Python. It was a useful framework.

Recommended Posts

Easily try Amazon EMR / Cloud Dataproc with Python [mrjob]
Try using Python with Google Cloud Functions
Try scraping with Python.
Easily beep with python
Easily serverless with Python with chalice
Try Python output with Haxe 3.2
Try running Python with Try Jupyter
Try face recognition with Python
Try Amazon Simple Workflow Service (SWF) with Python and boto3
Try scraping with Python + Beautiful Soup
[Package cloud] Manage python packages with package cloud
Try to operate Facebook with Python
Easily implement subcommands with python click
Easily handle lists with python + sqlite3
Try singular value decomposition with Python
Run XGBoost with Cloud Dataflow (Python)
Try face recognition with python + OpenCV
Easily handle databases with Python (SQLite3)
[Python] Collect images easily with icrawler!
Try frequency control simulation with Python
Easily post to twitter with Python 3
Try using Amazon DynamoDB from Python
Copy data from Amazon S3 to Google Cloud Storage with Python (boto)
Try to reproduce color film with Python
Building a Python3 environment with Amazon Linux2
Try mathematical formulas using Σ with python
Retrieving food data with Amazon API (Python)
Try working with binary data in Python
Use Amazon Simple Notification Service with Python
[GCP] Operate Google Cloud Storage with Python
Try HTML scraping with a Python library
Try calling Python from Ruby with thrift
Easily try automatic image generation with DCGAN-tensorflow
Text mining with Python ② Visualization with Word Cloud
Easily download mp3 / mp4 with python and youtube-dl!
Send an email with Amazon SES + Python
Try drawing a map with python + cartopy 0.18.0
[Continued] Try PLC register access with Python
Try assigning or switching with Python: lambda
[For beginners] Try web scraping with Python
Try python
Try implementing a Cisco Spark bot with AWS Lambda + Amazon API Gateway (Python)
WEB scraping with python and try to make a word cloud from reviews
nginxparser: Try parsing nginx config file with Python
Text extraction with GCP Cloud Vision API (Python3.6)
Try to solve the man-machine chart with Python
Try to draw a life curve with python
You can easily create a GUI with Python
Try to make a "cryptanalysis" cipher with Python
Easily realize microservices with Cloud Run x Flask
Try to automatically generate Python documents with Sphinx
Try working with Mongo in Python on Mac
[Python3] [Ubuntu16] [Docker] Try face recognition with OpenFace
Try to make a dihedral group with Python
Getting started with AWS IoT easily in Python
Create a Python console application easily with Click
Use Python / Django with Windows Azure Cloud Service!
[Cloud102] # 1 Get Started with Python (Part 1 Python First Steps)
Try to detect fish with python + OpenCV2.4 (unfinished)
[Cloud9] Try to build an environment with django 1.11 of Python 3.4 without understanding even 1 mm