export SPARK_HOME='/usr/local/bin/spark-2.2.0-bin-hadoop2.7'
Wenn Sie bei der Installation "without-hadoop" auswählen, müssen Sie hadoop einfügen und HADOOP_HOME festlegen. Bei Verwendung von PySpark
pip pyspark
Ich habe es in etwas gesteckt. Wenn sich die Version von spark unterscheidet, funktioniert sie möglicherweise nicht richtig. Wenn Sie also die Version angeben möchten
pip pyspark-2.2.1
Geben Sie dies an, indem Sie so etwas tun.
Die Verzeichnisstruktur sieht so aus. Der Inhalt wird unten erklärt.
$ tree
.
├── SimpleApp.scala
├── build.sbt
├── input.txt
└── run.sh
SimpleApp.scala entspricht fast dem Tutorial. Nur die Eingabe wurde ein wenig geändert. Lesen Sie die Textdatei und zählen Sie, wie viele "a" und "p" enthalten sind.
SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "input.txt" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numPs = logData.filter(line => line.contains("p")).count()
println(s"Lines with a: $numAs, Lines with p: $numPs")
spark.stop()
}
}
Entsprechende Sätze in input.txt
this is a pen
this is an apple
apple pen
pen pen
sbt package
Generiert eine JAR-Datei unter dem Ziel basierend auf dem Inhalt von build.sbt. (Ziel wird zu diesem Zeitpunkt auch willkürlich erstellt) Beispiel build.sbt
build.sbt
name := "simple"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
In diesem Fall lautet die generierte JAR-Datei "target / scala-2.11 / simple_2.11-1.0.jar".
run.sh ist ein Skript zur Ausführung.
run.sh
spark-submit \
--class SimpleApp \
--master local[4] \
--conf spark.driver.host=localhost \
target/scala-2.11/simple_2.11-1.0.jar
Wenn ich "sh run.sh" starte, sind die Ergebnisse gestaffelt.
Lines with a: 3, Lines with p: 4
Es ist in Ordnung, wenn enthalten ist.
** Von den Eingängen enthalten 3 Zeilen a und 4 Zeilen p **.
Der Teil spark.driver.host = localhost
war nicht im Tutorial enthalten, aber wenn Sie dies nicht schreiben, wird es Ihre Umgebung sein
Error
Exception in thread "main" java.lang.AssertionError: assertion failed: Expected hostname
Kam raus, also habe ich es hinzugefügt.
Für Python ist es etwas einfacher. Die Dateistruktur ist wie folgt
$ tree
.
├── SimpleApp.py
├── input.txt
└── run.sh
SimpleApp.py
from pyspark.sql import SparkSession
logFile = "input.txt" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numPs = logData.filter(logData.value.contains('p')).count()
print("Lines with a: %i, lines with p: %i" % (numAs, numPs))
spark.stop()
run.sh
spark-submit \
--master local[4] \
--conf spark.driver.host=localhost \
SimpleApp.py
input.txt ist das gleiche. Wenn Pypark in pip enthalten ist, müssen Sie das Skript nicht ausführen
python SimpleApp.py
Es scheint, dass es nur von selbst ausgeführt werden kann.
Dieses Mal starte ich Spark mit Spark Session. Sie können dasselbe mit SparkContext tun. Wenn Sie den Unterschied zwischen Spark-Kontext und Spark-Sitzung nicht verstehen, scheint in Spark-Sitzung Spark-Kontext enthalten zu sein, der nach Spark-Kontext angezeigt wird.
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
Erstellen Sie eine Spark-Instanz, indem Sie appName mit angeben.
logData = spark.read.text(logFile).cache()
Lesen Sie die Datei mit. cache () scheint eine Einstellung zu sein, die im Speicher verbleibt. (Es hat ohne diesen Code funktioniert) Dieses Mal lesen wir eine Textdatei, aber wir können auch strukturierte Daten wie CSV-Dateien und SQL-Tabellen lesen.
Die gelesenen Daten werden parallelisiert und mit "rdd" berechnet. Verschiedene Funktionen wie Map und Fileter werden in rdd vorbereitet.
Filter ist eine Methode zum wörtlichen Filtern. Dieses Mal lese ich die Datei Zeile für Zeile und benutze count (), um zu zählen, wie viele eine bestimmte Zeichenfolge enthalten.
numAs = logData.filter(logData.value.contains('a')).count()
SparkSession scheint als http-Server ausgeführt zu werden. Beenden Sie es also mit stop ()
am Ende.
spark.stop()
Recommended Posts