[PYTHON] Getting started on Docker Apache Hadoop

Introduction

I'm using Apache Beam as my first parallel and distributed framework, but traditional terms and concepts such as MapReduce often appear in learning around this. That's why I tried to get started with Apache Hadoop. This time, I'm using Docker so that I can build the environment as easily as possible.

1600px-Hadoop_logo_new.svg.png

Hadoop overview

Hadoop is a ** distributed processing framework ** for processing large amounts of data. Normally it runs on ** Linux **. It scales out well, so even if you process more data, you can improve performance by adding ** more servers. ** **

Hadoop consists of two main systems:

-** HDFS (Hadoop Distributed File System): ** Distributed file system. A large amount of data is divided and stored on multiple servers. Users can treat these multiple servers as if they were one large file system. -** MapReduce: ** A framework that realizes distributed processing. It is designed so that one large process (job) can be divided into multiple units (tasks) and executed in parallel on multiple servers.

Hadoop does not work with a single component, but with multiple components working together, such as HDFS and the MapReduce framework. ** These key components of Hadoop are sometimes referred to as the Hadoop ecosystem.

HDFS (Hadoop Distributed File System) On HDFS, large amounts of data are ** divided into small units (blocks) ** and placed in the file systems of multiple servers. For example, if the data size is 1GB (1024MB) and the block size is 64MB, the data will be split into 16 blocks and distributed on multiple servers.

By distributing the data to multiple servers and processing them in parallel in this way, throughput can be expected to improve. ** Communication between storage and servers is expensive **, so the data read by each server works as much as possible on that server, and finally the result processed by each server is processed via the network. Transfer and combine as one result.

In addition, ** divided blocks are stored on multiple servers so that if one server fails, data will not be lost or processing will fail. ** **

スクリーンショット 2020-04-03 17.38.03.png

When using HDFS, users don't have to worry about multiple servers running behind the scenes or how files are divided into blocks.

MapReduce MapReduce divides one job into multiple tasks and executes them in parallel. MapReduce processing consists of three major processes called ** Map **, ** Shuffle **, and ** Reduce **. Of these, ** Shuffle ** runs automatically, so you don't have to define the process. The contents of each process are as follows.

-** Map: ** Divide the data on HDFS and assign it to the task. The task extracts data (Key, Value pair) one by one from the assigned input data, performs user-defined processing, and then outputs the processing result (Key, Value pair). -** Shuffle: ** Map The processed data is combined with the data with the same key. At this time, data is transferred between the servers, so if the amount of transferred data is large, it may become a bottleneck for the entire process. -** Reduce: ** After Shuffle processing, user-defined processing is performed on the data grouped by key.

スクリーンショット 2020-04-03 17.38.49.png

If you try to implement such parallel and distributed processing without a framework like MapReduce, what kind of unit should one job be divided into, what computer should the task be executed on, and what should be the result of each task? It is necessary to consider many things such as how to combine them into one or how to recover from a server failure on the way.

Hadoop architecture

Hadoop has three main versions, 1, 2, and 3, each with a different architecture. And the main difference between Hadoop 1 and 2 is in the ** MapReduce architecture changes **. The MapReduce architecture in Hadoop 1 is called ** MRv1 **, and in Hadoop 2, MapReduce runs on the technology ** YARN (Yet-Another-Resource-Negotiator) **, which is called ** MRv2. Call it **.

Hadoop 1 Hadoop 2
HDFS HDFS
MapReduce (MRv1) MapReduce (MRv2) / YARN

There seems to be no (probably) a ** major ** architectural change from Hadoop 2 to Hadoop 3, so I'll cover the Hadoop 1 and Hadoop 2 architectures here.

Hadoop 1 A Hadoop cluster consists of two types of servers: a ** master server group ** that manages the entire cluster and a ** slave server group ** that is in charge of actual data processing. HDFS and MapReduce each have a master server and a slave server. Basically, there is one master server each and multiple slave servers.

HDFS architecture

-** NameNode: ** HDFS master server. Manage metadata such as where the data is located across the cluster. HDFS metadata is managed in memory, so you can get an instant response. -** DataNode: ** HDFS slave server. Performs the assigned task and returns a response. Since one storage is configured with the storage of multiple DataNodes, users do not need to be aware that there are multiple storages when using HDFS.

MapReduce (MRv1) architecture

-** JobTracker: ** MapReduce (MRv1) master server. It manages jobs such as notifying JobClient of the progress and completion of jobs, and manages resources such as dividing one job into multiple tasks and allocating tasks to each slave server. It also manages job history. JobTracker can be a single point of failure. -** TaskTracker: ** MapReduce (MRv1) slave server. Performs the assigned task and returns a response. Performs Map processing and Reduce processing to manage its resources. -** JobClient: ** Request jobs, change priorities, forcibly terminate, etc.

スクリーンショット 2020-04-03 17.38.23.png

Normally, the DataNode and TaskTracker are on the same machine, and the TaskTracker first runs the job on the data on the DataNode on the same machine. This can reduce network communication costs.

Hadoop 2 The main architectural difference between Hadoop 1 and Hadoop 2 is in MapReduce. Therefore, we will omit the HDFS architecture here.

MapReduce (MRv2) architecture

With MapReduce (MRv1), when the number of tasks is in the thousands to tens of thousands, the load on JobTracker is concentrated and it can become a bottleneck. Also, since a single JobTracker is used within the cluster, if you want to distribute the load, you need to prepare a new cluster. Distributing the load in this way causes problems such as reduced resource utilization efficiency and increased monitoring targets due to the increase in the number of JobTrackers, which is a single point of failure.

** YARN ** was introduced to address these issues. In YARN, the functions of JobTracker and TaskTracker are changed as follows.

MapReduce (MRv1) MapReduce (MRv2) / YARN
JobTracker ResourceManager、ApplicationMaster、JobHistoryServer
TaskTracker NodeManager

-** ResourceManager: ** Detach resource management from JobTracker. Resource management is centrally managed by ResourceManager, which improves resource utilization efficiency. -** ApplicationMaster: ** Separate job management from JobTracker. User implementation allows you to manage your own jobs. In addition to MapReduce, it also supports other distributed processing frameworks such as Apache Spark and Apache Tez. In addition, by launching ApplicationMaster for each job, you can avoid bottlenecks when the number of tasks increases. -** JobHistoryServer: ** Detach JobTracker history management. -** NodeManager: ** Detach the resource management that TaskTracker was doing.

Environment construction with Docker

We will build the operating environment for Hadoop. As mentioned above, Hadoop works by linking multiple components. Therefore, a distribution that summarizes various software is provided. By using a distribution, you can easily build an environment for executing distributed processing. This time, install CDH on Docker as a distribution.

In addition, Hadoop allows you to select the operation mode from the following three. This time, select the pseudo-distributed mode that allows you to easily check the operation.

-** Local mode: ** Build MapReduce operating environment on one server without using HDFS -** Pseudo-distributed mode: ** Build an operating environment for MapReduce using HDFS on one server -** Fully distributed mode: ** Build a MapReduce operating environment using HDFS on multiple servers

The package structure is as follows.

Package configuration


.
├── Dockerfile
├── main
    ├── WordCount.java  #Hadoop job (Java)
    ├── scripts  #Hadoop startup script etc.
    │   ├── create-input-text.sh
    │   ├── execute-wordcount-python.sh
    │   ├── execute-wordcount.sh
    │   ├── make-jar.sh
    │   └── start-hadoop.sh
    └── streaming  #Hadoop Streaming Job (Python)
        └── python
            ├── map.py
            └── reduce.py

Here is the Dockerfile to use. Hadoop is a Java application, so install the JDK. For the installation of CDH, I referred to here.

Dockerfile


FROM centos:centos7

RUN yum -y update
RUN yum -y install sudo

#Installation: JDK
RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel

#Setting environment variables (necessary when compiling)
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk
ENV PATH $PATH:$JAVA_HOME/bin
# tools.jar: contains javac compiler
ENV HADOOP_CLASSPATH $JAVA_HOME/lib/tools.jar

#Installation: CDH 5 package
##Build yum repository
RUN rpm --import http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera
RUN rpm -ivh http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm
##Pseudo-distributed mode settings and install packages that provide YARN, HDFS, etc.
RUN yum -y install hadoop-conf-pseudo

ADD main main
RUN chmod +x -R main

WORKDIR main

#Continue to start the container even after executing the command
CMD ["tail", "-f", "/dev/null"]

Now, create a Docker image from this Dockerfile.

docker image build -t {Namespace/Image name:Tag name} .

Start the container if the build is successful. After starting Hadoop, you will be able to access the web interface at http: // localhost: 50070, so port forwarding it.

docker container run --name {Container name} -d -p 50070:50070 {Namespace/Image name:Tag name}

After successfully starting the container, enter the container so that you can perform command operations.

docker exec -it {Container name} /bin/bash

Run scripts / start-hadoop.sh to start Hadoop.

scripts/start-hadoop.sh


#!/usr/bin/env bash

#Format the metadata area managed by NameNode
sudo -u hdfs hdfs namenode -format

#Start HDFS
for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done

#Initialization
sudo /usr/lib/hadoop/libexec/init-hdfs.sh

#Grant permissions on HDFS files
sudo -u hdfs hadoop fs -ls -R /

#Start YARN
sudo service hadoop-yarn-resourcemanager start
sudo service hadoop-yarn-nodemanager start
sudo service hadoop-mapreduce-historyserver start
[root@xxxxxxxxx main]# ./scripts/start-hadoop.sh

After Hadoop has started, you can access the web interface at http: // localhost: 50070, and you can see the cluster status, job execution progress, and results from the GUI.

スクリーンショット 2020-04-03 3.17.15.png

Implementing and running MapReduce

Now that the environment has been built, let's actually create a MapReduce application. MapReduce applications can be written in Java as well as languages such as Pig Latin and HiveQL.

Java WordCount.java is an example implementation of a MapReduce application in Java. It is an application that extracts words from the input text file and counts the number of words.

WordCount.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount extends Configured implements Tool {
    /**
     * Mapper<Input key type,Input value type,Output key type,Output value type>Class inherited from.
     */
    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            //Initialization process
        }

        /**
         *Describe Map processing.
         *
         * @param key Byte offset value indicating the position of the byte from the beginning (usually not used)
         * @param value 1 row of data
         * @param context Access to job settings and I / O data through Context
         */
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            //Cleanup processing
        }
    }

    /**
     * Reducer<Input key type,Input value type,Output key type,Output value type>Class inherited from.
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            //Initialization process
        }

        /**
         *Describe Reduce processing.
         *
         * @param key Map Processing output (key)
         * @param values Map processing output (value iterable)
         * @param context Context
         */
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            //Cleanup processing
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        //Submit a job to JobTracker
        Job job = Job.getInstance(getConf(), "WordCount");

        //Unlike Map tasks, where the number is automatically determined according to the input data, you need to specify the number of Reduce tasks yourself.
        job.setNumReduceTasks(2);

        //Specify one of the classes stored in the jar file
        job.setJarByClass(WordCount.class);

        //Specify which class to use as Mapper, Combiner, Reducer
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //Input / output data from a text file
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //Directory path for I / O
        TextInputFormat.addInputPath(job, new Path(args[0]));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        //Wait for the job to complete
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordCount(), args);
        System.exit(res);
    }
}

To describe Map processing, create a class that inherits org.apache.hadoop.mapreduce.Mapper, and similarly, to describe Reduce processing, create a class that inherits org.apache.hadoop.mapreduce.Reducer. I will. Also, in Hadoop, org.apache.hadoop.io.Text means String and org.apache.hadoop.io.IntWritable means int.

In order to run a MapReduce application implemented in Java, you need to compile and create a jar file.

scripts/make-jar.sh


#!/usr/bin/env bash

#compile
hadoop com.sun.tools.javac.Main WordCount.java

#Creating a jar
jar cf wc.jar WordCount*.class
[root@xxxxxxxxx main]# ./scripts/make-jar.sh

Also, prepare a text file for input.

Bash:./scripts/create-input-text.sh


#!/usr/bin/env bash

#Create a text file for input
echo "apple lemon apple lemon lemon grape" > input.txt

#Place the input text file in HDFS
sudo -u hdfs hadoop fs -mkdir -p /user/hdfs/input
sudo -u hdfs hadoop fs -put input.txt /user/hdfs/input
[root@xxxxxxxxx main]# ./scripts/create-input-text.sh

Now that we're ready, let's run.

scripts/execute-wordcount.sh


#!/usr/bin/env bash

# WordCount.Execution of java
# hadoop jar {jar file path} {Main class name} {Input file path} {Output destination path}
sudo -u hdfs hadoop jar wc.jar WordCount /user/hdfs/input/input.txt /user/hdfs/output01

#View results
sudo -u hdfs hadoop fs -ls /user/hdfs/output01
sudo -u hdfs hadoop fs -cat /user/hdfs/output01/part-r-*
[root@xxxxxxxxx main]# ./scripts/execute-wordcount.sh

If the job is successful, a file called _SUCCESS will be generated under the output path. You can also see that the output results are stored in one or more files in the format part-r- *, with the following results:

part-r-00000


apple   2
grape   1
lemon   3

Python (Hadoop Streaming) Hadoop Streaming is an interface for ** running MapReduce applications in languages other than Java **. It uses standard I / O to pass data, which is inconvenient compared to Java's MapReduce application, but it can be developed in familiar languages. This time, I'll try it in Python.

In Hadoop Streaming, in addition to the input destination path and output destination path, it is necessary to specify the path of the file in which the map processing and reduce processing to be executed are defined.

scripts/execute-wordcount-python.sh


#!/usr/bin/env bash

#Run Hadoop Streaming
sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.16.2.jar \
-input /user/hdfs/input/input.txt -output /user/hdfs/output02 \
-mapper /main/streaming/python/map.py -reducer /main/streaming/python/reduce.py

#View results
sudo -u hdfs hadoop fs -ls /user/hdfs/output02
sudo -u hdfs hadoop fs -cat /user/hdfs/output02/part-*

map.py generates the key value of <word 1> from the standard input and outputs it to the standard output.

streaming/python/map.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


#Divide one line by empty string delimiter(word, 1)Generate key value of
def map_fn(line):
    return [(key, 1) for key in re.split(r'\s', line.strip()) if key]

#Output key value to standard output
def output(records):
    for key, value in records:
        print '{0}\t{1}'.format(key, value)

#Receive input from standard input
for l in sys.stdin:
    output(map_fn(l))

reduce.py actually counts the number of times a word appears and outputs the final processing result to standard output.

streaming/python/reduce.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys

results = {}


#Count the number of times a word appears
def reduce_fn(line):
    key, value = re.split(r'\t', line.strip())
    if not key in results:
        results[key] = 0
    results[key] = results[key] + int(value)

#Output key value (final processing result) to standard output
def output(records):
    for k, v in records:
        print '{0}\t{1}'.format(k, v)

#Receive map processing output from standard input
for l in sys.stdin:
    reduce_fn(l)
output(sorted(results.items()))

If the input file is the same as in Java, you'll probably get similar results from the file in the destination path.

part-00000


apple   2
grape   1
lemon   3

Hadoop ecosystem

There are many other major components of Hadoop, but it's hard to see them all, so I'll add more items to each of them as needed.

component Overview
Pig DSL called Pig Latin(Domain Specific Language)You can define the process with, and you can create MapReduce applications more easily with less code than Java.
Hive You can define the process in a SQL-like DSL called HiveQL.
HBase A NoSQL distributed database built on HDFS. It is a system to supplement the part that HDFS is not good at.

Summary

This time, I wrote an article as a summary when I got started with Hadoop. There are some parts that you only get a rough idea of, and some of the main components of Hadoop that you only know about the overview, so I'll continue to learn. In addition, there are various managed services provided in the cloud such as AWS and GCP, so I would like to learn about the differences in usability by actually moving them.

If there are any mistakes, please request a correction. Also, please let us know if there are any useful sites!

Reference URL

-A thorough introduction to Hadoop, 2nd edition, construction of an open source distributed processing environment -Introduction to Apache Hadoop, a distributed processing platform, and the latest technological trends in the Hadoop ecosystem -Comfortable with pigs and birdhouses Hadoop Life! ?? – Apache Spark and Apache Pig & Apache Hive – -Roughly organize the Hadoop ecosystem -Run MapReduce in Python with Hadoop Streaming

Recommended Posts

Getting started on Docker Apache Hadoop
Getting started with apache2
Getting started with USD on Windows
Getting started with Python 3.8 on Windows
pykintone on Docker
I started Docker
Grails getting started
Getting started with Python with 100 knocks on language processing
Try Apache Spark on Jupyter Notebook (on local Docker
Getting started with Android!
1.1 Getting Started with Python
Getting Started with Golang 2
Install docker on Fedora31
Getting Started with Golang 1
Getting Started with Python
Getting Started with Django 1
Getting Started with Optimization
Getting Started with Golang 3
Install Docker on AWS
Getting Started with Numpy
Getting started with Spark
Materials to read when getting started with Apache Beam
Getting Started with Python
Getting Started with Pydantic
Getting Started with Golang 4
Getting Started with Jython
Install Python 3.6 on Docker
Getting Started with Django 2
Translate Getting Started With TensorFlow
Getting Started with Python Functions
Getting Started with Tkinter 2: Buttons
[Linux] [Initial Settings] Getting Started
Getting Started with PKI with Golang ―― 4
Django Getting Started: 2_ Project Creation
Django Getting Started: 1_Environment Building
Getting Started with Python Django (1)
Django Getting Started: 4_MySQL Integration
Apache installation fails on CentOS 8.2
Getting Started with Python Django (4)
Getting Started with Python Django (3)
Getting Started with Python Django (6)
Getting Started with Django with PyCharm
Run IPython Notebook on Docker
Install Docker on WSL Ubuntu 18.04
Python3 | Getting Started with numpy
Getting Started with Python Django (5)
Getting started on how to solve linear programming problems with PuLP