Ich habe mich gefragt, ob ich mit Python problemlos eine MapReduce-Anwendung erstellen und mit ** Amazon EMR ** ausführen kann, aber sie heißt mrjob. Ich habe etwas über das Python-Framework gelernt. Es gibt andere Optionen wie PySpark, aber es ist ein Framework, das geringere Lernkosten hat und einfacher zu implementieren ist. In diesem Artikel möchte ich beschreiben, wie eine mit mrjob erstellte Anwendung auf Amazon EMR ausgeführt wird. Ich werde auch versuchen, es auf GCPs ** Cloud Dataproc ** auszuführen.
mrjob ist ein Framework, mit dem Sie Anwendungen in Python schreiben können, die in Hadoop-Clustern ausgeführt werden. ** Sie können MapReduce-Anwendungen problemlos lokal oder in der Cloud ausführen, ohne über Hadoop-Kenntnisse zu verfügen. ** mrjob läuft intern auf Hadoop Streaming. Was ist Hadoop überhaupt? In diesem Fall wird die Gliederung in diesem Artikel beschrieben. Wenn Sie möchten, lesen Sie sie bitte.
MapReduce-Anwendungen teilen normalerweise das Eingabedatensatz auf und führen ** Kartenverarbeitung **, ** (Zufallsverarbeitung) **, ** Verarbeitung reduzieren ** durch. Sie können auch ** Kombinierte Verarbeitung ** verwenden, die einen Zwischenaggregationsprozess ausführt, bevor Sie das Ausgabeergebnis der Kartenverarbeitung an Reduzieren der Verarbeitung übergeben.
Die diesmal implementierte Anwendung ist ein Programm, das zählt, wie oft ein Wort in einem Dokument erscheint. Es wird erwartet, dass die folgende Eingabe empfangen wird.
input.txt
We are the world, we are the children
We are the ones who make a brighter day
Nimmt Schlüssel-Wert-Paare Zeile für Zeile als Eingabe und gibt null oder mehr Schlüssel-Wert-Paare zurück.
Input
Input: (None, "we are the world, we are the children")
Output
Output:
"we", 1
"are", 1
"the", 1
"world", 1
"we", 1
"are", 1
"the", 1
"children", 1
Es verwendet einen zeilenweisen Schlüssel und eine Liste seiner Werte als Eingabe und gibt null oder mehr Schlüssel-Wert-Paare zurück.
Input
Input: ("we", [1, 1])
Output
Output:
"we", 2
Es nimmt eine Liste von Schlüsseln und ihren Werten als Eingabe und gibt null oder mehr Schlüssel-Wert-Paare als Ausgabe zurück.
Input
Input: ("we", [2, 1]) #1. Zeile"we"2 mal, 2. Zeile"we"Erscheint einmal
Output
Output:
"we", 3
Erstellen Sie eine Umgebung zum Erstellen von MapReduce-Anwendungen in Python.
Es kann von PyPI installiert werden. Dieses Mal verwende ich Version ** 0.7.1 **.
pip install mrjob==0.7.1
Schreiben wir nun den obigen Prozess in mrjob. Wenn der Prozess einfach ist und in einem Schritt beschrieben werden kann, erstellen Sie eine Klasse, die ** mrjob.job.MRJob ** wie folgt erbt.
mr_word_count.py
import re
from mrjob.job import MRJob
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
#Verarbeiten Sie die Eingabe zeilenweise,(Wort, 1)Schlüsselwert von generieren
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
#Nimmt einen zeilenweisen Schlüssel und eine Liste seiner Werte als Eingabe und Summen
def combiner(self, word, counts):
yield word, sum(counts)
#Nehmen Sie eine Liste der Schlüssel und ihrer Werte als Eingabe und Summe
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
Wenn der Prozess komplizierter ist und mehrere Schritte erfordert, definieren Sie den Prozess, indem Sie ** mrjob.step.MRStep ** wie folgt an MRJobs ** Schritte ** -Funktion übergeben: tun können.
mr_word_count.py
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(mapper=...,
combiner=...,
reducer=...),
...
]
def mapper_get_words(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combiner_count_words(self, word, counts):
yield word, sum(counts)
def reducer_count_words(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
Versuchen Sie, die Anwendung lokal in einer Amazon EMR- oder Cloud Dataproc-Umgebung auszuführen. Die Paketstruktur ist wie folgt.
Paketkonfiguration
.
├── config.yaml
├── input.txt
└── src
├── __init__.py
└── mr_word_count.py
Beim Ausführen der Anwendung können Sie Optionen (https://mrjob.readthedocs.io/en/latest/guides/configs-reference.html) wie den Eingabepfad angeben. Es gibt zwei Optionen: ** in der Konfigurationsdatei vorab geschrieben ** und ** von der Konsole übergeben **. Wenn die angegebene Option abgedeckt ist, hat ** der von der Konsole übergebene Wert Vorrang. ** Die Konfigurationsdatei kann in der Form yaml oder json definiert werden.
Geben Sie Folgendes in die Konsole ein, um die von mrjob erstellte Anwendung auszuführen:
python {Anwendungspfad} -r {Ausführungsumgebung} -c {Konfigurationsdateipfad} < {Eingabepfad} > {Ausgabepfad}
Wenn Sie es lokal ausführen, müssen Sie weder die Ausführungsumgebung noch den Pfad zur Konfigurationsdatei angeben, wenn Sie es nicht benötigen. Wenn Sie den Ausgabepfad nicht angeben, wird er an die Standardausgabe ausgegeben.
python src/mr_word_count.py < input.txt
#Wenn Sie den Pfad der Ausführungsumgebung oder der Konfigurationsdatei übergeben müssen
python src/mr_word_count.py -r local -c config.yaml < input.txt
Wenn Sie den obigen Befehl ausführen, wird die Anwendung ausgeführt und ein Ergebnis gedruckt, das dem folgenden in der Standardausgabe ähnelt:
"world" 1
"day" 1
"make" 1
"ones" 1
"the" 3
"we" 3
"who" 1
"brighter" 1
"children" 1
"a" 1
"are" 3
Amazon EMR Um unter Amazon EMR ausgeführt zu werden, müssen ** AWS_ACCESS_KEY_ID ** und ** AWS_SECRET_ACCESS_KEY ** in Umgebungsvariablen oder in der Konfigurationsdatei festgelegt werden. Sie können auch den Instanztyp und die Anzahl der Kerne festlegen. Die Anwendung und die zugehörigen Dateien werden vor ihrer Ausführung in S3 hochgeladen.
config.yaml
runners:
emr:
aws_access_key_id: <your key ID>
aws_secret_access_key: <your secret>
instance_type: c1.medium
num_core_instances: 4
Weitere Optionen finden Sie hier (https://mrjob.readthedocs.io/en/latest/guides/emr-opts.html).
Wenn Sie es ausführen, wählen Sie ** emr ** für Ihre Umgebung.
python src/mr_word_count.py -r emr -c config.yaml input.txt --output-dir=s3://emr-test/output/
Cloud Dataproc Aktivieren Sie die Dataproc-API von GCP, um Cloud Dataproc nutzen zu können. Geben Sie dann den Pfad der Anmeldeinformationsdatei in der Umgebungsvariablen ** GOOGLE_APPLICATION_CREDENTIALS ** an. Legen Sie in der Konfigurationsdatei die Zone, den Typ und die Anzahl der Kerne der Instanz fest. Eine weitere Option finden Sie hier (https://mrjob.readthedocs.io/en/latest/guides/dataproc-opts.html).
config.yaml
runners:
dataproc:
zone: us-central1-a
instance_type: n1-standard-1
num_core_instances: 2
Wenn Sie es ausführen, wählen Sie ** dataproc ** für Ihre Umgebung.
python src/mr_word_count.py -r dataproc -c config.yaml input.txt --output-dir=gs://dataproc-test/output/
Mit mrjob konnte ich problemlos eine MapReduce-Anwendung in Python erstellen und die erstellte Anwendung problemlos in einer Cloud-Umgebung ausführen. Außerdem verfügt mrjob über zahlreiche Dokumente, sodass der Einstieg einfach und der Einstieg sehr einfach ist, wenn Sie MapReduce-Jobs in Python einfach ausführen möchten. Es war ein bequemer Rahmen.