Ich verwende Apache Beam als mein erstes Framework für die parallele verteilte Verarbeitung, aber traditionelle Begriffe und Konzepte wie MapReduce tauchen häufig beim Lernen auf. Deshalb habe ich versucht, mit Apache Hadoop zu beginnen. Dieses Mal verwenden wir Docker, um die Umgebung so einfach wie möglich zu gestalten.
Hadoop ist ein ** verteiltes Verarbeitungsframework ** zur Verarbeitung großer Datenmengen. Normalerweise läuft es unter ** Linux **. Es lässt sich gut skalieren. Selbst wenn Sie mehr Daten verarbeiten, können Sie die Leistung verbessern, indem Sie ** mehr Server hinzufügen. ** ** **
Hadoop besteht aus zwei Hauptsystemen:
Hadoop funktioniert nicht mit einer einzelnen Komponente, sondern mit mehreren Komponenten, die zusammenarbeiten, z. B. HDFS und das MapReduce-Framework. ** Diese Schlüsselkomponenten von Hadoop werden manchmal als Hadoop-Ökosystem bezeichnet.
HDFS (Hadoop Distributed File System) Unter HDFS werden große Datenmengen ** in kleine Einheiten (Blöcke) ** unterteilt und in den Dateisystemen mehrerer Server abgelegt. Wenn die Datengröße beispielsweise 1 GB (1024 MB) und die Blockgröße 64 MB beträgt, werden die Daten in 16 Blöcke aufgeteilt und auf mehrere Server verteilt.
Durch die Verteilung und parallele Verarbeitung von Daten auf mehrere Server kann eine Verbesserung des Durchsatzes erwartet werden. ** Die Kommunikation zwischen Speicher und Servern ist teuer **, sodass die von jedem Server gelesenen Daten auf diesem Server so gut wie möglich funktionieren und schließlich das von jedem Server verarbeitete Ergebnis über das Netzwerk verarbeitet wird. Übertragen und kombinieren als ein Ergebnis.
Außerdem werden ** geteilte Blöcke auf mehreren Servern gespeichert, sodass bei einem Ausfall eines Servers keine Daten verloren gehen oder die Verarbeitung fehlschlägt. ** ** **
Bei der Verwendung von HDFS müssen sich Benutzer nicht um mehrere Server kümmern, die hinter den Kulissen ausgeführt werden, oder darum, wie Dateien in Blöcke unterteilt werden.
MapReduce MapReduce unterteilt einen Job in mehrere Aufgaben und führt sie parallel aus. Die MapReduce-Verarbeitung besteht aus drei Hauptprozessen: ** Map **, ** Shuffle ** und ** Reduce **. Von diesen wird ** Shuffle ** automatisch ausgeführt, sodass Sie keine Aktionen definieren müssen. Der Inhalt jedes Prozesses ist wie folgt.
Wenn Sie versuchen, eine solche parallele verteilte Verarbeitung ohne ein Framework wie MapReduce zu implementieren, in welche Art von Einheit sollte ein Job unterteilt werden, auf welchem Computer sollte die Aufgabe ausgeführt werden und was sollte das Ergebnis jeder Aufgabe sein? Es ist notwendig, viele Dinge zu berücksichtigen, wie man sie zu einem kombiniert oder wie man sich unterwegs von einem Serverausfall erholt.
Hadoop hat drei Hauptversionen, 1, 2 und 3, jede mit einer anderen Architektur. Der Hauptunterschied zwischen Hadoop 1 und 2 besteht in den Änderungen der MapReduce-Architektur. Die MapReduce-Architektur in Hadoop 1 heißt ** MRv1 **, und in Hadoop 2 läuft MapReduce auf der Technologie ** YARN (Yet-Another-Resource-Negotiator) **, die ** MRv2 heißt. Nennen **.
Hadoop 1 | Hadoop 2 |
---|---|
HDFS | HDFS |
MapReduce (MRv1) | MapReduce (MRv2) / YARN |
Es scheint (wahrscheinlich) keine ** größere ** architektonische Änderung von Hadoop 2 zu Hadoop 3 zu geben, daher werde ich hier die Hadoop 1- und Hadoop 2-Architekturen behandeln.
Hadoop 1 Ein Hadoop-Cluster besteht aus zwei Servertypen: der ** Master-Servergruppe **, die den gesamten Cluster verwaltet, und der ** Slave-Servergruppe **, die für die eigentliche Datenverarbeitung verantwortlich ist. Jedes HDFS und MapReduce verfügt über einen Master-Server und einen Slave-Server. Grundsätzlich gibt es jeweils einen Master-Server und mehrere Slave-Server.
Normalerweise befinden sich DataNode und TaskTracker auf demselben Computer, und der TaskTracker führt den Job zuerst für die Daten auf dem DataNode auf demselben Computer aus. Dies kann die Netzwerkkommunikationskosten senken.
Hadoop 2 Der architektonische Unterschied zwischen Hadoop 1 und Hadoop 2 liegt hauptsächlich in MapReduce. Daher werden wir hier die HDFS-Architektur weglassen.
Bei MapReduce (MRv1) konzentriert sich die Belastung von JobTracker auf einen Engpass, wenn die Anzahl der Aufgaben zwischen Tausenden und Zehntausenden liegt. Da wir einen einzelnen JobTracker innerhalb des Clusters verwenden, müssen wir auch einen neuen Cluster vorbereiten, wenn wir die Last verteilen möchten. Das Verteilen der Last auf diese Weise führt zu Problemen wie einer verringerten Ressourcennutzungseffizienz und erhöhten Überwachungszielen aufgrund einer Erhöhung der Anzahl von JobTrackern, was ein einzelner Fehlerpunkt ist.
** YARN ** wurde eingeführt, um diese Probleme anzugehen. In YARN werden die Funktionen von JobTracker und TaskTracker wie folgt geändert.
MapReduce (MRv1) | MapReduce (MRv2) / YARN |
---|---|
JobTracker | ResourceManager、ApplicationMaster、JobHistoryServer |
TaskTracker | NodeManager |
Wir werden die Betriebsumgebung für Hadoop erstellen. Wie oben erwähnt, verbindet Hadoop mehrere Komponenten. Daher wird eine Distribution bereitgestellt, die verschiedene Software zusammenfasst. Mithilfe einer Distribution können Sie problemlos eine Umgebung für die Ausführung der verteilten Verarbeitung erstellen. Installieren Sie dieses Mal CDH auf Docker als Distribution.
Darüber hinaus können Sie in Hadoop den Betriebsmodus aus den folgenden drei Modi auswählen. Wählen Sie dieses Mal den pseudoverteilten Modus aus, mit dem Sie den Vorgang einfach überprüfen können.
Die Paketstruktur ist wie folgt.
Paketkonfiguration
.
├── Dockerfile
├── main
├── WordCount.java #Hadoop Job (Java)
├── scripts #Hadoop-Startskript usw.
│ ├── create-input-text.sh
│ ├── execute-wordcount-python.sh
│ ├── execute-wordcount.sh
│ ├── make-jar.sh
│ └── start-hadoop.sh
└── streaming #Hadoop-Streaming-Job (Python)
└── python
├── map.py
└── reduce.py
Hier ist die Docker-Datei zu verwenden. Hadoop ist eine Java-Anwendung. Installieren Sie daher das JDK. Für die Installation von CDH habe ich auf [hier] verwiesen (https://docs.cloudera.com/documentation/enterprise/5-3-x/topics/cdh_qs_yarn_pseudo.html).
Dockerfile
FROM centos:centos7
RUN yum -y update
RUN yum -y install sudo
#Installation: JDK
RUN yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
#Festlegen von Umgebungsvariablen (beim Kompilieren erforderlich)
ENV JAVA_HOME /usr/lib/jvm/java-1.8.0-openjdk
ENV PATH $PATH:$JAVA_HOME/bin
# tools.jar: enthält javac compiler
ENV HADOOP_CLASSPATH $JAVA_HOME/lib/tools.jar
#Installation: CDH 5-Paket
##Erstellen Sie ein leckeres Repository
RUN rpm --import http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/RPM-GPG-KEY-cloudera
RUN rpm -ivh http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm
##Pseudo-verteilte Moduseinstellungen und Installationspakete, die YARN, HDFS usw. bereitstellen.
RUN yum -y install hadoop-conf-pseudo
ADD main main
RUN chmod +x -R main
WORKDIR main
#Starten Sie den Container auch nach Ausführung des Befehls weiter
CMD ["tail", "-f", "/dev/null"]
Erstellen Sie nun ein Docker-Image aus dieser Docker-Datei.
docker image build -t {Namensraum/Bildname:Verlinke den Namen} .
Starten Sie den Container, wenn der Build erfolgreich ist. Nach dem Starten von Hadoop können Sie unter http: // localhost: 50070 auf die Weboberfläche zugreifen.
docker container run --name {Containername} -d -p 50070:50070 {Namensraum/Bildname:Verlinke den Namen}
Wenn der Container erfolgreich gestartet wird, wird er für Befehlsoperationen in den Container eingegeben.
docker exec -it {Containername} /bin/bash
Führen Sie scripts / start-hadoop.sh aus, um Hadoop zu starten.
scripts/start-hadoop.sh
#!/usr/bin/env bash
#Formatieren Sie den von NameNode verwalteten Metadatenbereich
sudo -u hdfs hdfs namenode -format
#Starten Sie HDFS
for x in `cd /etc/init.d ; ls hadoop-hdfs-*` ; do sudo service $x start ; done
#Initialisieren
sudo /usr/lib/hadoop/libexec/init-hdfs.sh
#Erteilen Sie Berechtigungen für HDFS-Dateien
sudo -u hdfs hadoop fs -ls -R /
#Starten Sie YARN
sudo service hadoop-yarn-resourcemanager start
sudo service hadoop-yarn-nodemanager start
sudo service hadoop-mapreduce-historyserver start
[root@xxxxxxxxx main]# ./scripts/start-hadoop.sh
Nach dem Start von Hadoop können Sie unter http: // localhost: 50070 auf die Weboberfläche zugreifen und den Clusterstatus, den Fortschritt der Jobausführung und die Ergebnisse über die GUI anzeigen.
Nachdem die Umgebung erstellt wurde, erstellen wir eine MapReduce-Anwendung. MapReduce-Anwendungen können sowohl in Java als auch in Sprachen wie Pig Latin und HiveQL geschrieben werden.
Java WordCount.java ist eine Beispielimplementierung einer MapReduce-Anwendung in Java. Es ist eine Anwendung, die Wörter aus der Eingabetextdatei extrahiert und die Anzahl der Wörter zählt.
WordCount.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount extends Configured implements Tool {
/**
* Mapper<Typ der Eingabetaste,Eingabewerttyp,Typ des Ausgabeschlüssels,Ausgabewerttyp>Klasse geerbt von.
*/
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void setup(Context context) throws IOException, InterruptedException {
//Initialisierungsprozess
}
/**
*Beschreiben der Kartenverarbeitung.
*
* @param key Byte-Offset-Wert, der die Position der Zeile von Anfang an angibt (normalerweise nicht verwendet)
* @Parameterwert 1 Datenzeile
* @param context Zugriff auf Jobeinstellungen und Eingabe- / Ausgabedaten über den Kontext
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
//Bereinigungsverarbeitung
}
}
/**
* Reducer<Typ der Eingabetaste,Eingabewerttyp,Typ des Ausgabeschlüssels,Ausgabewerttyp>Klasse geerbt von.
*/
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
//Initialisierungsprozess
}
/**
*Beschreiben Reduzieren Sie die Verarbeitung.
*
* @param key Ausgabe der Kartenverarbeitung (Schlüssel)
* @Parameterwerte Kartenverarbeitungsausgabe (Wert iterierbar)
* @param context Context
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
//Bereinigungsverarbeitung
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
//Senden Sie einen Job an JobTracker
Job job = Job.getInstance(getConf(), "WordCount");
//Im Gegensatz zu Kartenaufgaben, bei denen die Anzahl automatisch anhand der Eingabedaten ermittelt wird, müssen Sie die Anzahl der Aufgaben zum Reduzieren selbst angeben.
job.setNumReduceTasks(2);
//Geben Sie eine der in der JAR-Datei gespeicherten Klassen an
job.setJarByClass(WordCount.class);
//Geben Sie an, welche Klasse als Mapper, Combiner, Reducer verwendet werden soll
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//Eingabe / Ausgabe von Daten aus einer Textdatei
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//Verzeichnispfad für Ein- und Ausgabe
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(args[1]));
//Warten Sie, bis der Auftrag abgeschlossen ist
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}
Um den Map-Prozess zu beschreiben, erstellen Sie eine Klasse, die org.apache.hadoop.mapreduce.Mapper erbt, und erstellen Sie zur Beschreibung des Reduce-Prozesses eine Klasse, die org.apache.hadoop.mapreduce.Reducer erbt. Ich werde. In Hadoop bedeutet org.apache.hadoop.io.Text String und org.apache.hadoop.io.IntWritable int.
Um die in Java implementierte MapReduce-Anwendung auszuführen, müssen Sie eine JAR-Datei kompilieren und erstellen.
scripts/make-jar.sh
#!/usr/bin/env bash
#kompilieren
hadoop com.sun.tools.javac.Main WordCount.java
#Ein Glas erstellen
jar cf wc.jar WordCount*.class
[root@xxxxxxxxx main]# ./scripts/make-jar.sh
Bereiten Sie außerdem eine Textdatei für die Eingabe vor.
Bash:./scripts/create-input-text.sh
#!/usr/bin/env bash
#Erstellen Sie eine Textdatei für die Eingabe
echo "apple lemon apple lemon lemon grape" > input.txt
#Platzieren Sie die Eingabetextdatei in HDFS
sudo -u hdfs hadoop fs -mkdir -p /user/hdfs/input
sudo -u hdfs hadoop fs -put input.txt /user/hdfs/input
[root@xxxxxxxxx main]# ./scripts/create-input-text.sh
Jetzt, wo wir fertig sind, lass uns rennen.
scripts/execute-wordcount.sh
#!/usr/bin/env bash
# WordCount.Führen Sie Java aus
# hadoop jar {JAR-Dateipfad} {Name der Hauptklasse} {Pfad der Eingabedatei} {Zielpfad ausgeben}
sudo -u hdfs hadoop jar wc.jar WordCount /user/hdfs/input/input.txt /user/hdfs/output01
#Ergebnisse anzeigen
sudo -u hdfs hadoop fs -ls /user/hdfs/output01
sudo -u hdfs hadoop fs -cat /user/hdfs/output01/part-r-*
[root@xxxxxxxxx main]# ./scripts/execute-wordcount.sh
Wenn der Job erfolgreich ist, wird unter dem Ausgabepfad eine Datei mit dem Namen _SUCCESS generiert. Sie können auch sehen, dass die Ausgabeergebnisse in einer oder mehreren Dateien im Format part-r- * gespeichert sind, mit den folgenden Ergebnissen:
part-r-00000
apple 2
grape 1
lemon 3
Python (Hadoop Streaming) Hadoop Streaming ist eine Schnittstelle zum ** Ausführen von MapReduce-Anwendungen in anderen Sprachen als Java **. Es ist im Vergleich zu Javas MapReduce-Anwendung unpraktisch, da es Standard-E / A zum Übergeben von Daten verwendet, aber in vertrauten Sprachen entwickelt werden kann. Dieses Mal werde ich es mit Python versuchen.
In Hadoop Streaming muss zusätzlich zum Eingabezielpfad und zum Ausgabezielpfad der Pfad der Datei angegeben werden, in der die auszuführende Kartenverarbeitung und die zu reduzierende Verarbeitung definiert sind.
scripts/execute-wordcount-python.sh
#!/usr/bin/env bash
#Führen Sie Hadoop Streaming aus
sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.6.0-mr1-cdh5.16.2.jar \
-input /user/hdfs/input/input.txt -output /user/hdfs/output02 \
-mapper /main/streaming/python/map.py -reducer /main/streaming/python/reduce.py
#Ergebnisse anzeigen
sudo -u hdfs hadoop fs -ls /user/hdfs/output02
sudo -u hdfs hadoop fs -cat /user/hdfs/output02/part-*
map.py generiert den Schlüsselwert von <word 1> aus der Standardeingabe und gibt ihn an die Standardausgabe aus.
streaming/python/map.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import sys
#Teilen Sie eine Zeile durch ein Leerzeichen(Wort, 1)Schlüsselwert von generieren
def map_fn(line):
return [(key, 1) for key in re.split(r'\s', line.strip()) if key]
#Ausgabeschlüsselwert zur Standardausgabe
def output(records):
for key, value in records:
print '{0}\t{1}'.format(key, value)
#Empfangen Sie Eingaben von der Standardeingabe
for l in sys.stdin:
output(map_fn(l))
redu.py zählt tatsächlich, wie oft ein Wort erscheint, und gibt das endgültige Verarbeitungsergebnis an die Standardausgabe aus.
streaming/python/reduce.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import re
import sys
results = {}
#Zählen Sie, wie oft ein Wort erscheint
def reduce_fn(line):
key, value = re.split(r'\t', line.strip())
if not key in results:
results[key] = 0
results[key] = results[key] + int(value)
#Ausgabeschlüsselwert (endgültiges Verarbeitungsergebnis) an die Standardausgabe
def output(records):
for k, v in records:
print '{0}\t{1}'.format(k, v)
#Empfangen Sie eine Kartenverarbeitungsausgabe von der Standardeingabe
for l in sys.stdin:
reduce_fn(l)
output(sorted(results.items()))
Wenn die Eingabedatei mit Java identisch ist, erhalten Sie wahrscheinlich ähnliche Ergebnisse aus der Datei im Zielpfad.
part-00000
apple 2
grape 1
lemon 3
Es gibt viele andere Hauptkomponenten von Hadoop, aber es ist schwierig, alle zu sehen, daher werde ich jedem von ihnen nach Bedarf weitere Elemente hinzufügen.
Komponente | Überblick |
---|---|
Pig | DSL heißt Pig Latin(Domain Specific Language)Sie können den Prozess mit definieren, wodurch es einfacher wird, MapReduce-Anwendungen mit weniger Code als Java zu erstellen. |
Hive | Sie können den Prozess in einem SQL-ähnlichen DSL namens HiveQL definieren. |
HBase | Eine verteilte NoSQL-Datenbank, die auf HDFS basiert. Es ist ein System zur Ergänzung der Teile, in denen HDFS nicht gut ist. |
Dieses Mal schrieb ich einen Artikel als Zusammenfassung, als ich mit Hadoop anfing. Es gibt einige Teile, von denen wir nur eine grobe Vorstellung haben, und einige der Hauptkomponenten von Hadoop, die wir nur über die Übersicht kennen, sodass wir weiter lernen werden. Darüber hinaus werden in der Cloud verschiedene verwaltete Dienste wie AWS und GCP bereitgestellt. Daher möchte ich die Unterschiede in der Benutzerfreundlichkeit kennenlernen, indem ich sie tatsächlich verschiebe.
Wenn es Fehler gibt, fordern Sie bitte eine Korrektur an. Bitte teilen Sie uns auch mit, ob es nützliche Websites gibt!
Recommended Posts