[PYTHON] Premiers pas sur Docker Apache Hadoop

introduction

J'utilise Apache Beam comme premier cadre de traitement distribué parallèle, mais les termes et concepts traditionnels tels que MapReduce apparaissent souvent dans l'apprentissage de ce sujet. C'est pourquoi j'ai essayé de démarrer avec Apache Hadoop. Cette fois, nous utilisons Docker pour rendre l'environnement aussi simple que possible.

1600px-Hadoop_logo_new.svg.png

Présentation de Hadoop

Hadoop est un ** framework de traitement distribué ** pour traiter de gros volumes de données. Normalement, il fonctionne sous ** Linux **. Il évolue bien, donc même si vous traitez plus de données, vous pouvez améliorer les performances en ajoutant ** plus de serveurs. ** **

Hadoop se compose de deux systèmes principaux:

Hadoop ne fonctionne pas avec un seul composant, mais avec plusieurs composants travaillant ensemble, tels que HDFS et le framework MapReduce. ** Ces composants clés de Hadoop sont parfois appelés l'écosystème Hadoop.

HDFS (Hadoop Distributed File System) Sur HDFS, de grandes quantités de données sont ** divisées en petites unités (blocs) ** et placées dans les systèmes de fichiers de plusieurs serveurs. Par exemple, si la taille des données est de 1 Go (1024 Mo) et la taille du bloc est de 64 Mo, les données seront divisées en 16 blocs et distribuées sur plusieurs serveurs.

En distribuant les données à plusieurs serveurs de cette manière et en les traitant en parallèle, on peut s'attendre à ce que le débit s'améliore. ** La communication entre le stockage et les serveurs est coûteuse **, donc les données lues par chaque serveur fonctionnent autant que possible sur ce serveur, et enfin le résultat traité par chaque serveur est traité via le réseau. Transférez et combinez en un seul résultat.

De plus, ** les blocs divisés sont stockés sur plusieurs serveurs de sorte qu'en cas de défaillance d'un serveur, les données ne seront pas perdues ou le traitement échouera. ** **

スクリーンショット 2020-04-03 17.38.03.png

Lors de l'utilisation de HDFS, les utilisateurs n'ont pas à se soucier de l'exécution de plusieurs serveurs en arrière-plan ou de la façon dont les fichiers sont divisés en blocs.

MapReduce MapReduce divise un travail en plusieurs tâches et les exécute en parallèle. Le traitement MapReduce comprend trois processus principaux appelés ** Map **, ** Shuffle ** et ** Reduce **. Parmi ceux-ci, ** Shuffle ** s'exécute automatiquement, vous n'avez donc pas à définir d'actions. Le contenu de chaque processus est le suivant.

スクリーンショット 2020-04-03 17.38.49.png

Si vous essayez d'implémenter un tel traitement distribué en parallèle sans un cadre comme MapReduce, dans quel type d'unité un travail doit-il être divisé, sur quel ordinateur la tâche doit-elle être exécutée et quel doit être le résultat de chaque tâche? Il est nécessaire de prendre en compte de nombreux éléments tels que la façon de les combiner en un seul ou comment récupérer après une panne de serveur en cours de route.

Architecture Hadoop

Hadoop a trois versions principales, 1, 2 et 3, chacune avec une architecture différente. Et la principale différence entre Hadoop 1 et 2 réside dans les ** modifications architecturales de MapReduce **. L'architecture MapReduce dans Hadoop 1 s'appelle ** MRv1 **, et dans Hadoop 2, MapReduce fonctionne sur la technologie ** YARN (Yet-Another-Resource-Negotiator) **, qui s'appelle ** MRv2. Appeler **.

Hadoop 1 Hadoop 2
HDFS HDFS
MapReduce (MRv1) MapReduce (MRv2) / YARN

Il semble qu'il n'y ait pas (probablement) de changement architectural ** majeur ** de Hadoop 2 à Hadoop 3, donc je vais couvrir les architectures Hadoop 1 et Hadoop 2 ici.

Hadoop 1 Un cluster Hadoop se compose de deux types de serveurs: le ** groupe de serveurs maîtres ** qui gère l'ensemble du cluster et le ** groupe de serveurs esclaves ** qui est en charge du traitement réel des données. HDFS et MapReduce ont chacun un serveur maître et un serveur esclave. Fondamentalement, il y a un serveur maître chacun et plusieurs serveurs esclaves.

Architecture HDFS

Architecture MapReduce (MRv1)

スクリーンショット 2020-04-03 17.38.23.png

Normalement, le DataNode et le TaskTracker se trouvent sur le même ordinateur et le TaskTracker exécute d'abord le travail sur les données du DataNode sur le même ordinateur. Cela peut réduire les coûts de communication réseau.

Hadoop 2 La différence architecturale entre Hadoop 1 et Hadoop 2 réside principalement dans MapReduce. Par conséquent, nous omettons ici l'architecture HDFS.

Architecture MapReduce (MRv2)

Avec MapReduce (MRv1), lorsque le nombre de tâches se situe entre des milliers et des dizaines de milliers, la charge sur JobTracker est concentrée et peut devenir un goulot d'étranglement. De plus, comme nous utilisons un seul JobTracker dans le cluster, nous devons préparer un nouveau cluster si nous voulons répartir la charge. La répartition de la charge de cette manière entraîne des problèmes tels qu'une efficacité d'utilisation des ressources réduite et des objectifs de surveillance accrus en raison d'une augmentation du nombre de JobTrackers, qui est un point de défaillance unique.

** YARN ** a été introduit pour résoudre ces problèmes. Dans YARN, les fonctions de JobTracker et TaskTracker sont modifiées comme suit.

MapReduce (MRv1) MapReduce (MRv2) / YARN
JobTracker ResourceManager、ApplicationMaster、JobHistoryServer
TaskTracker NodeManager

Créer un environnement avec Docker

Nous allons créer l'environnement d'exploitation pour Hadoop. Comme mentionné ci-dessus, Hadoop fonctionne en reliant plusieurs composants. Par conséquent, une distribution qui résume divers logiciels est fournie. En utilisant une distribution, vous pouvez facilement créer un environnement pour exécuter le traitement distribué. Cette fois, installez CDH sur Docker en tant que distribution.

De plus, Hadoop vous permet de sélectionner le mode de fonctionnement parmi les trois modes suivants. Cette fois, sélectionnez le mode pseudo-distribué qui vous permet de vérifier facilement le fonctionnement.

La structure du package est la suivante.

Configuration du package


.
├── Dockerfile
├── main
    ├── WordCount.java  #Tâche Hadoop (Java)
    ├── scripts  #Script de démarrage Hadoop, etc.
    │   ├── create-input-text.sh
    │   ├── execute-wordcount-python.sh
    │   ├── execute-wordcount.sh
    │   ├── make-jar.sh
    │   └── start-hadoop.sh
    └── streaming  #Tâche de streaming Hadoop (Python)
        └── python
            ├── map.py
            └── reduce.py

Voici le Dockerfile à utiliser. Hadoop est une application Java, alors installez le JDK. Pour l'installation de CDH, je me suis référé à ici.

Dockerfile


FROM centos:centos7

RUN yum -y update
RUN yum -y install sudo

#Installation: JDK
RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel

#Définition des variables d'environnement (nécessaire lors de la compilation)
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk
ENV PATH $PATH:$JAVA_HOME/bin
# tools.jar: contient le compilateur javac
ENV HADOOP_CLASSPATH $JAVA_HOME/lib/tools.jar

#Installation: paquet CDH 5
##Construire le référentiel yum
RUN rpm --import http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera
RUN rpm -ivh http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm
##Paramètres de mode pseudo-distribué et installation de packages fournissant YARN, HDFS, etc.
RUN yum -y install hadoop-conf-pseudo

ADD main main
RUN chmod +x -R main

WORKDIR main

#Continuez à démarrer le conteneur même après avoir exécuté la commande
CMD ["tail", "-f", "/dev/null"]

Maintenant, créez une image Docker à partir de ce Dockerfile.

docker image build -t {Espace de nom/Nom de l'image:Nom de la balise} .

Démarrez le conteneur si la génération réussit. Après avoir démarré Hadoop, vous pourrez accéder à l'interface Web à l'adresse http: // localhost: 50070, donc le transfert de port.

docker container run --name {Nom du conteneur} -d -p 50070:50070 {Espace de nom/Nom de l'image:Nom de la balise}

Si le conteneur démarre correctement, il entrera dans le conteneur pour les opérations de commande.

docker exec -it {Nom du conteneur} /bin/bash

Exécutez scripts / start-hadoop.sh pour démarrer Hadoop.

scripts/start-hadoop.sh


#!/usr/bin/env bash

#Mettre en forme la zone de métadonnées gérée par NameNode
sudo -u hdfs hdfs namenode -format

#Démarrez HDFS
for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done

#Initialisation
sudo /usr/lib/hadoop/libexec/init-hdfs.sh

#Accorder des autorisations sur les fichiers HDFS
sudo -u hdfs hadoop fs -ls -R /

#Démarrer YARN
sudo service hadoop-yarn-resourcemanager start
sudo service hadoop-yarn-nodemanager start
sudo service hadoop-mapreduce-historyserver start
[root@xxxxxxxxx main]# ./scripts/start-hadoop.sh

Une fois Hadoop démarré, vous pouvez accéder à l'interface Web à l'adresse http: // localhost: 50070 et afficher l'état du cluster, la progression de l'exécution du travail et les résultats à partir de l'interface graphique.

スクリーンショット 2020-04-03 3.17.15.png

Implémentation et exécution de MapReduce

Maintenant que l'environnement a été construit, créons en fait une application MapReduce. Les applications MapReduce peuvent être écrites en Java ainsi que dans des langages tels que Pig Latin et HiveQL.

Java WordCount.java est un exemple d'implémentation d'une application MapReduce en Java. C'est une application qui extrait les mots du fichier texte d'entrée et compte le nombre de mots.

WordCount.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount extends Configured implements Tool {
    /**
     * Mapper<Type de clé d'entrée,Type de valeur d'entrée,Type de clé de sortie,Type de valeur de sortie>Classe héritée de.
     */
    public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            //Processus d'initialisation
        }

        /**
         *Décrire le traitement de la carte.
         *
         * @clé de paramètre Valeur de décalage d'octet indiquant la position de la ligne depuis le début (généralement non utilisée)
         * @valeur du paramètre 1 ligne de données
         * @param context Accès aux paramètres du travail et aux données d'entrée / sortie via le contexte
         */
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            //Traitement de nettoyage
        }
    }

    /**
     * Reducer<Type de clé d'entrée,Type de valeur d'entrée,Type de clé de sortie,Type de valeur de sortie>Classe héritée de.
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            //Processus d'initialisation
        }

        /**
         *Décrire Réduire le traitement.
         *
         * @param key Sortie du traitement de la carte (clé)
         * @valeurs de paramètre Sortie du traitement de la carte (valeur itérable)
         * @param context Context
         */
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            //Traitement de nettoyage
        }
    }

    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        //Soumettre un travail à JobTracker
        Job job = Job.getInstance(getConf(), "WordCount");

        //Contrairement aux tâches de mappage, où le nombre est automatiquement déterminé en fonction des données d'entrée, vous devez spécifier vous-même le nombre de tâches de réduction.
        job.setNumReduceTasks(2);

        //Spécifiez l'une des classes stockées dans le fichier jar
        job.setJarByClass(WordCount.class);

        //Spécifiez la classe à utiliser comme Mapper, Combiner, Reducer
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //Données d'entrée / sortie à partir d'un fichier texte
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        //Chemin du répertoire pour l'entrée et la sortie
        TextInputFormat.addInputPath(job, new Path(args[0]));
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        //Attendez que le travail soit terminé
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new WordCount(), args);
        System.exit(res);
    }
}

Pour décrire le processus Map, créez une classe qui hérite de org.apache.hadoop.mapreduce.Mapper, et de même, pour décrire le processus Reduce, créez une classe qui hérite de org.apache.hadoop.mapreduce.Reducer. Je vais. De plus, dans Hadoop, org.apache.hadoop.io.Text signifie String et org.apache.hadoop.io.IntWritable signifie int.

Pour exécuter l'application MapReduce implémentée en Java, vous devez compiler et créer un fichier jar.

scripts/make-jar.sh


#!/usr/bin/env bash

#compiler
hadoop com.sun.tools.javac.Main WordCount.java

#Créer un pot
jar cf wc.jar WordCount*.class
[root@xxxxxxxxx main]# ./scripts/make-jar.sh

Préparez également un fichier texte pour la saisie.

Bash:./scripts/create-input-text.sh


#!/usr/bin/env bash

#Créer un fichier texte pour la saisie
echo "apple lemon apple lemon lemon grape" > input.txt

#Placez le fichier texte d'entrée dans HDFS
sudo -u hdfs hadoop fs -mkdir -p /user/hdfs/input
sudo -u hdfs hadoop fs -put input.txt /user/hdfs/input
[root@xxxxxxxxx main]# ./scripts/create-input-text.sh

Maintenant que nous sommes prêts, courons.

scripts/execute-wordcount.sh


#!/usr/bin/env bash

# WordCount.Exécutez java
# hadoop jar {chemin du fichier jar} {Nom de la classe principale} {Chemin du fichier d'entrée} {Chemin de destination de sortie}
sudo -u hdfs hadoop jar wc.jar WordCount /user/hdfs/input/input.txt /user/hdfs/output01

#Voir les résultats
sudo -u hdfs hadoop fs -ls /user/hdfs/output01
sudo -u hdfs hadoop fs -cat /user/hdfs/output01/part-r-*
[root@xxxxxxxxx main]# ./scripts/execute-wordcount.sh

Si le travail réussit, un fichier appelé _SUCCESS sera généré sous le chemin de sortie. Vous pouvez également voir que les résultats de sortie sont stockés dans un ou plusieurs fichiers au format part-r- *, avec les résultats suivants:

part-r-00000


apple   2
grape   1
lemon   3

Python (Hadoop Streaming) Hadoop Streaming est une interface pour ** exécuter des applications MapReduce dans des langues autres que Java **. Il est peu pratique par rapport à l'application MapReduce de Java car il utilise des E / S standard pour transmettre des données, mais il peut être développé dans des langages familiers. Cette fois, je vais l'essayer avec Python.

Dans Hadoop Streaming, en plus du chemin de destination d'entrée et du chemin de destination de sortie, il est nécessaire de spécifier le chemin du fichier dans lequel sont définis le traitement de la carte et le traitement de réduction à exécuter.

scripts/execute-wordcount-python.sh


#!/usr/bin/env bash

#Exécuter le streaming Hadoop
sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.16.2.jar \
-input /user/hdfs/input/input.txt -output /user/hdfs/output02 \
-mapper /main/streaming/python/map.py -reducer /main/streaming/python/reduce.py

#Voir les résultats
sudo -u hdfs hadoop fs -ls /user/hdfs/output02
sudo -u hdfs hadoop fs -cat /user/hdfs/output02/part-*

map.py génère la valeur clé de <mot 1> à partir de l'entrée standard et la renvoie vers la sortie standard.

streaming/python/map.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys


#Diviser une ligne par un délimiteur de caractère vide(mot, 1)Générer une valeur clé de
def map_fn(line):
    return [(key, 1) for key in re.split(r'\s', line.strip()) if key]

#Valeur de la clé de sortie vers la sortie standard
def output(records):
    for key, value in records:
        print '{0}\t{1}'.format(key, value)

#Recevoir l'entrée de l'entrée standard
for l in sys.stdin:
    output(map_fn(l))

reduction.py compte en fait le nombre de fois qu'un mot apparaît et affiche le résultat final du traitement sur la sortie standard.

streaming/python/reduce.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys

results = {}


#Comptez le nombre de fois qu'un mot apparaît
def reduce_fn(line):
    key, value = re.split(r'\t', line.strip())
    if not key in results:
        results[key] = 0
    results[key] = results[key] + int(value)

#Valeur de clé de sortie (résultat du traitement final) vers la sortie standard
def output(records):
    for k, v in records:
        print '{0}\t{1}'.format(k, v)

#Recevoir la sortie du traitement de la carte depuis l'entrée standard
for l in sys.stdin:
    reduce_fn(l)
output(sorted(results.items()))

Si le fichier d'entrée est le même qu'en Java, vous obtiendrez probablement des résultats similaires à partir du fichier dans le chemin de destination.

part-00000


apple   2
grape   1
lemon   3

Écosystème Hadoop

Il existe de nombreux autres composants majeurs de Hadoop, mais il est difficile de tous les voir, donc j'ajouterai plus d'éléments à chacun d'eux si nécessaire.

composant Aperçu
Pig DSL appelé Pig Latin(Domain Specific Language)Vous pouvez définir le processus avec, ce qui facilite la création d'applications MapReduce avec moins de code que Java.
Hive Vous pouvez définir le processus dans un DSL de type SQL appelé HiveQL.
HBase Une base de données distribuée de NoSQL construite sur HDFS. C'est un système pour compléter les parties pour lesquelles HDFS n'est pas bon.

Résumé

Cette fois, j'ai écrit un article comme résumé lorsque j'ai commencé avec Hadoop. Il y a certaines parties que je ne comprends qu'en gros, et certains des principaux composants de Hadoop que je ne connais que sur la vue d'ensemble, je vais donc continuer à apprendre. En outre, il existe divers services gérés fournis dans le cloud, tels qu'AWS et GCP, j'aimerais donc en savoir plus sur les différences d'utilisabilité en les déplaçant réellement.

Si vous trouvez des erreurs, veuillez demander une correction. Veuillez également nous faire savoir s'il existe des sites utiles!

URL de référence

Recommended Posts

Premiers pas sur Docker Apache Hadoop
Premiers pas avec apache2
Commençant par USD sur Windows
Premiers pas avec Python 3.8 sous Windows
pykintone sur Docker
J'ai commencé Docker
Grails pour commencer
Démarrer avec Python avec 100 coups sur le traitement du langage
Essayez Apache Spark avec Jupyter Notebook (sur Docker local
Premiers pas avec Android!
1.1 Premiers pas avec Python
Installer docker sur Fedora31
Premiers pas avec Python
Premiers pas avec Django 1
Introduction à l'optimisation
Installez Docker sur AWS
Premiers pas avec Numpy
Premiers pas avec Spark
Matériel à lire lors de la mise en route d'Apache Beam
Premiers pas avec Python
Premiers pas avec Pydantic
Premiers pas avec Jython
Installez Python 3.6 sur Docker
Premiers pas avec Django 2
Traduire Premiers pas avec TensorFlow
Introduction aux fonctions Python
Introduction à Tkinter 2: Button
[Linux] [Configuration initiale] Introduction
Premiers pas avec PKI avec Golang ―― 4
Django Getting Started: 2_ Créer un projet
Django Premiers pas: 1_Construction de l'environnement
Premiers pas avec Python Django (1)
Django Getting Started: intégration 4_MySQL
L'installation d'Apache échoue sur CentOS 8.2
Premiers pas avec Python Django (4)
Premiers pas avec Python Django (3)
Introduction à Python Django (6)
Premiers pas avec Django avec PyCharm
Exécutez IPython Notebook sur Docker
Installer Docker sur WSL Ubuntu 18.04
Premiers pas avec Python Django (5)
Comment résoudre les problèmes de planification linéaire avec PuLP