Als erster Schritt zur Überprüfung von Apache Spark. Wie jeder, der Erfahrung mit Hadoop hat, gut weiß, zählt derjenige die gleichen Wörter in der Datei. Die Umgebung ist Mac OSX, aber ich frage mich, ob es für Linux fast dasselbe ist. Der vollständige Code lautet hier.
$ brew install apache-spark
OK, wenn die Spark-Shell funktioniert und `scala>`
angezeigt wird
$ /usr/local/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:47 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:51 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:51 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:53 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/04/07 16:44:56 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/04/07 16:44:56 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.
scala>
Dies wurde unter Bezugnahme auf die Beschreibung auf der offiziellen Website geschrieben.
Bitte bereiten Sie sich wie folgt vor.
$ tree
.
├── input
│ └── data #Zu lesender Text
└── wordcount.py #Ausführungsskript
1 directory, 4 files
Hier verwenden wir Python. Sie können in Scala oder Java schreiben. Ich bin gut darin, also lass uns gehen. So was.
wordcount.py
#!/usr/bin/env python
# coding:utf-8
from pyspark import SparkContext
def execute(sc, src, dest):
'''
Führen Sie die Wortzählung durch
'''
#Lesen Sie die src-Datei
text_file = sc.textFile(src)
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
#Ergebnisse exportieren
counts.saveAsTextFile(dest)
if __name__ == '__main__':
sc = SparkContext('local', 'WordCount')
src = './input'
dest = './output'
execute(sc, src, dest)
Passend. Zum Beispiel so.
./input/data
aaa
bbb
ccc
aaa
bbbb
ccc
aaa
Der folgende Befehl.
$ which pyspark
/usr/local/bin/pyspark
#Lauf
$ pyspark ./wordcount.py
Wenn Sie es ausführen, fließt ein Protokoll. (Wie Hadoop Streaming)
./output/part-00000
(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)
Es wurde richtig gezählt.
Beachten Sie, dass der nächste Prozess fehlschlägt, wenn das Ausgabezielverzeichnis (./output) bereits generiert wurde. Es ist eine gute Idee, eine Shell wie die folgende an dasselbe Verzeichnis anzuhängen.
exec.sh
#!/bin/bash
rm -fR ./output
/usr/local/bin/pyspark ./wordcount.py
echo ">>>>> result"
cat ./output/*
$ sh exec.sh
・ ・ ・
>>>>> result
(u'aaa', 3)
(u'bbbb', 1)
(u'bbb', 1)
(u'ccc', 2)
Recommended Posts