Je me demandais si je pourrais facilement créer une application MapReduce avec Python à laquelle je suis habitué et l'exécuter avec ** Amazon EMR **, mais elle s'appelait mrjob. J'ai découvert le framework Python. Il existe d'autres options telles que PySpark, mais c'est un framework qui a un coût d'apprentissage inférieur et est plus facile à mettre en œuvre. Donc, dans cet article, je voudrais décrire comment exécuter une application créée avec mrjob sur Amazon EMR. J'essaierai également de l'exécuter sur ** Cloud Dataproc ** de GCP.
mrjob est un framework qui vous permet d'écrire des applications en Python qui s'exécutent dans des clusters Hadoop. ** Vous pouvez facilement exécuter des applications MapReduce localement ou dans le cloud sans aucune expertise Hadoop. ** mrjob s'exécute sur Hadoop Streaming en interne. Qu'est-ce que Hadoop en premier lieu? Dans ce cas, le plan est décrit dans cet article, veuillez donc vous y référer si vous le souhaitez.
Les applications MapReduce divisent généralement l'ensemble de données d'entrée et effectuent des ** traitements de carte **, ** (traitement aléatoire) **, ** traitement de réduction **. Vous pouvez également utiliser le ** Traitement combiné **, qui effectue un processus d'agrégation intermédiaire, avant de transmettre le résultat de sortie du traitement de mappage au traitement de réduction.
L'application implémentée cette fois est un programme qui compte le nombre de fois qu'un mot apparaît dans un document. Il devrait recevoir les informations suivantes.
input.txt
We are the world, we are the children
We are the ones who make a brighter day
Prend des paires clé-valeur en entrée, ligne par ligne, et renvoie zéro ou plusieurs paires clé-valeur.
Input
Input: (None, "we are the world, we are the children")
Output
Output:
"we", 1
"are", 1
"the", 1
"world", 1
"we", 1
"are", 1
"the", 1
"children", 1
Il prend une clé ligne par ligne et une liste de ses valeurs comme entrée et renvoie zéro ou plusieurs paires clé-valeur.
Input
Input: ("we", [1, 1])
Output
Output:
"we", 2
Il prend une liste de clés et leurs valeurs comme entrée et renvoie zéro ou plusieurs paires clé-valeur en sortie.
Input
Input: ("we", [2, 1]) #1ère ligne"we"2 fois, 2e ligne"we"Apparaît une fois
Output
Output:
"we", 3
Créez un environnement pour créer des applications MapReduce en Python.
Il peut être installé à partir de PyPI. Cette fois, j'utilise la version ** 0.7.1 **.
pip install mrjob==0.7.1
Maintenant, écrivons le processus ci-dessus dans mrjob. Si le processus est simple et peut être décrit en une seule étape, créez une classe qui hérite de ** mrjob.job.MRJob ** comme suit.
mr_word_count.py
import re
from mrjob.job import MRJob
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
#Traitez la saisie ligne par ligne,(mot, 1)Générer une valeur clé de
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
#Prend une clé ligne par ligne et une liste de ses valeurs comme entrée et totaux
def combiner(self, word, counts):
yield word, sum(counts)
#Prenez une liste de clés et leurs valeurs comme entrée et somme
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
Si le processus est plus compliqué et nécessite plusieurs étapes, définissez le processus en passant ** mrjob.step.MRStep ** à la fonction ** steps ** de MRJob comme suit: peut faire.
mr_word_count.py
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRWordCount(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(mapper=...,
combiner=...,
reducer=...),
...
]
def mapper_get_words(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combiner_count_words(self, word, counts):
yield word, sum(counts)
def reducer_count_words(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordCount.run()
Essayez d'exécuter l'application localement, dans un environnement Amazon EMR ou Cloud Dataproc. La structure du package est comme ça.
Configuration du package
.
├── config.yaml
├── input.txt
└── src
├── __init__.py
└── mr_word_count.py
Lors de l'exécution de l'application, vous pouvez spécifier des options (https://mrjob.readthedocs.io/en/latest/guides/configs-reference.html) telles que le chemin d'entrée. Il existe deux options, ** pré-écrites dans le fichier de configuration ** et ** transmises depuis la console **. Si l'option spécifiée est couverte, ** la valeur transmise depuis la console est prioritaire. ** Le fichier Config peut être défini sous la forme yaml ou json.
Pour exécuter l'application créée par mrjob, entrez ce qui suit à partir de la console:
python {Chemin de l'application} -r {Environnement d'exécution} -c {Chemin du fichier de configuration} < {Chemin d'entrée} > {Chemin de sortie}
Si vous l'exécutez localement, vous n'avez pas besoin de spécifier l'environnement d'exécution ou le chemin d'accès au fichier de configuration si vous n'en avez pas besoin. Si vous ne spécifiez pas le chemin de sortie, il sera émis vers la sortie standard.
python src/mr_word_count.py < input.txt
#Si vous devez passer le chemin de l'environnement d'exécution ou du fichier de configuration
python src/mr_word_count.py -r local -c config.yaml < input.txt
L'exécution de la commande ci-dessus exécutera l'application et imprimera un résultat similaire au suivant dans la sortie standard:
"world" 1
"day" 1
"make" 1
"ones" 1
"the" 3
"we" 3
"who" 1
"brighter" 1
"children" 1
"a" 1
"are" 3
Amazon EMR Pour s'exécuter sur Amazon EMR, ** AWS_ACCESS_KEY_ID ** et ** AWS_SECRET_ACCESS_KEY ** doivent être définis dans les variables d'environnement ou dans le fichier de configuration. Vous pouvez également définir le type d'instance et le nombre de cœurs. L'application et ses fichiers associés sont téléchargés sur S3 avant leur exécution.
config.yaml
runners:
emr:
aws_access_key_id: <your key ID>
aws_secret_access_key: <your secret>
instance_type: c1.medium
num_core_instances: 4
D'autres options peuvent être trouvées ici (https://mrjob.readthedocs.io/en/latest/guides/emr-opts.html).
Lorsque vous l'exécutez, sélectionnez ** emr ** pour votre environnement.
python src/mr_word_count.py -r emr -c config.yaml input.txt --output-dir=s3://emr-test/output/
Cloud Dataproc Pour tirer parti de Cloud Dataproc, activez l'API Dataproc de GCP. Ensuite, spécifiez le chemin du fichier d'informations d'identification dans la variable d'environnement ** GOOGLE_APPLICATION_CREDENTIALS **. Dans le fichier de configuration, définissez la zone, le type et le nombre de cœurs de l'instance. Une autre option est ici (https://mrjob.readthedocs.io/en/latest/guides/dataproc-opts.html).
config.yaml
runners:
dataproc:
zone: us-central1-a
instance_type: n1-standard-1
num_core_instances: 2
Lorsque vous l'exécutez, sélectionnez ** dataproc ** pour votre environnement.
python src/mr_word_count.py -r dataproc -c config.yaml input.txt --output-dir=gs://dataproc-test/output/
En utilisant mrjob, j'ai pu créer facilement une application MapReduce en Python et j'ai pu facilement exécuter l'application créée dans l'environnement cloud. De plus, mrjob a beaucoup de documents, il est donc facile de démarrer et il est très facile de démarrer si vous voulez exécuter facilement des tâches MapReduce en Python. C'était un cadre pratique.
Recommended Posts