Blitzschnelles Cluster-Computing. Eine Bibliothek, die die Stapelverarbeitung in großem Maßstab verteilt. Die verteilte Verarbeitung ist gut. Sie können SQL verwenden. Streaming-Daten können verwendet werden. Kann für maschinelles Lernen verwendet werden. Die Graphentheorie kann verwendet werden. Deep Learning kann platziert werden. Diese nutzen den Speicher voll aus und verteilen den Cluster mit hoher Geschwindigkeit.
Ubuntu
sudo apt-get install -y openjdk-8-jdk
Mac
brew cask install java
Ubuntu
sudo apt install maven
mac
brew install maven
Sei / usr / local / spark SPARK_HOME. Wählen Sie eine beliebige Version. http://ftp.riken.jp/net/apache/spark/
Ubuntu
wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz
$ tar zxvf spark-1.6.2-bin-hadoop2.6.tgz
$ sudo mv spark-1.6.2-bin-hadoop2.6 /usr/local/
$ sudo ln -s /usr/local/spark-1.6.2-bin-hadoop2.6 /usr/local/spark
Fügen Sie Folgendes zu .bashrc hinzu
Ubuntu
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
Mac
brew install apache-spark
python
$ spark-shell --master local[*]
(Unterlassung)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
(Unterlassung)
scala> val textFile = sc.textFile("/usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = /usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29
scala> wordCounts.collect()
res0: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), ...(Unterlassung)..., (>>>,1), (programming,1), (T...
scala>
Überprüfen Sie, ob es auf der Konsole funktioniert. Bei Verwendung mit Python
python
./bin/pyspark
Fügen Sie Folgendes zu .bashrc hinzu
python
#spark
export SPARK_HOME=/usr/local/spark/spark-1.6.2-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin
#jupyter spark
export PYSPARK_PYTHON=$PYENV_ROOT/shims/python #Passen Sie den Pfad an die Umgebung an
export PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
python
source .bashrc
pyspark
Durch Ausführen des Befehls pyspark wird jupyter gestartet. Wenn Sie einen Fehler erhalten, der die RDD von spark nicht erfasst, können Sie ihn beheben, indem Sie den Kernel neu starten.
Parallele Ausführung wird möglich.
Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Python
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
Scala
val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt
Java
JavaRDD<String> distFile = sc.textFile("data.txt");
Python
distFile = sc.textFile("data.txt")
Holen Sie sich die Daten mit textFile und legen Sie sie auf rdd Mit Karte konvertieren Mit reduzieren aggregieren
Scala
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
Python
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
Scala
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
Python
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
Scala
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
Python
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
Scala
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Python
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
Umwandlung | Bedeutung |
---|---|
map(func) | Konvertieren Sie in ein neues verteiltes Dataset, indem Sie jedes Element der Quelle mit der Funktion func übergeben. |
filter(func) | Wählt das Quellelement aus, für das func true zurückgibt und ein neues Dataset zurückgibt. |
flatMap(func) | Ähnlich wie bei der Zuordnung, jedoch kann jedes Eingabeelement 0 oder mehr Ausgabeelementen zugeordnet werden (func muss Seq anstelle eines einzelnen Elements zurückgeben). |
mapPartitions(func) | Ähnlich wie bei einer Map, jedoch einzeln auf jeder Partition (Block) des RDD ausgeführt. Wenn Sie also auf einem RDD vom Typ T ausgeführt werden, ist func ein Iterator |
mapPartitionsWithIndex(func) | Ähnlich wie bei mapPartitions, aber func erhält einen ganzzahligen Wert, der den Index der Partition darstellt. Wenn Sie mit RDD vom Typ T arbeiten, ist func daher (Int, Iterator). |
sample(withReplacement, fraction, seed) | Verwendet den angegebenen Startwert des Zufallszahlengenerators, um einen Bruchteil der Daten mit oder ohne Substitution abzutasten. |
union(otherDataset) | Gibt ein neues Dataset zurück, das die Summe der Elemente und Argumente im Quelldatensatz enthält. |
intersection(otherDataset) | Gibt eine neue RDD zurück, die die gemeinsamen Teile der Elemente und Argumente im Quelldatensatz enthält. |
distinct([numTasks])) | Gibt ein neues Dataset zurück, das verschiedene Elemente des Quelldatensatzes enthält. |
groupByKey([numTasks]) | Beim Aufruf mit dem Datensatzsatz (K, V) (K, Iterable) |
reduceByKey(func, [numTasks]) | Bei einem Aufruf mit einem Datensatzpaar (K, V) beträgt der Wert jedes Schlüssels (V, V).=>Aggregiert mit der typisierten Reduktionsfunktion func (K, V) V..Wie bei groupByKey kann die Anzahl der Reduzierungsaufgaben über das optionale zweite Argument festgelegt werden. |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | Beim Aufruf mit einem Datensatzpaar (K, V) wird ein Datensatzpaar (K, U) zurückgegeben. Hier wird der Wert jedes Schlüssels unter Verwendung der angegebenen Verknüpfungsfunktion und eines neutralen "Null" -Werts aggregiert. Ermöglichen Sie Aggregatwerttypen, die sich von Eingabewerttypen unterscheiden, und vermeiden Sie unnötige Zuordnungen. Wie bei groupByKey kann die Anzahl der Reduzierungsaufgaben mit dem optionalen zweiten Argument festgelegt werden. |
sortByKey([ascending], [numTasks]) | Wenn K mit einem Datensatz von (K, V) Paaren aufgerufen wird, der Ordered implementiert, wird ein Datensatz von (K, V) Paaren nach aufsteigendem oder absteigendem Schlüssel sortiert, wie durch das aufsteigende Argument von Boolean angegeben. Kehrt zurück. |
join(otherDataset, [numTasks]) | Beim Aufruf mit Datensätzen vom Typ (K, V) und (K, W) wird ein Datensatz von (K, (V, W)) Paaren zurückgegeben, der alle Elementpaare für jeden Schlüssel enthält. Äußere Verknüpfungen werden von leftOuterJoin, rightOuterJoin und fullOuterJoin unterstützt. |
cogroup(otherDataset, [numTasks]) | (K、(Iterable |
cartesian(otherDataset) | Beim Aufruf eines Datensatzes vom Typ T und vom Typ U wird ein Datensatz von (T, U) -Paaren (alle Elementpaare) zurückgegeben. |
pipe(command, [envVars]) | Shell-Befehle für jede Partition des RDD (z. B. Perl- oder Bash-Skript. RDD-Elemente werden in das stdin des Prozesses geschrieben, und die in stdout gedruckten Zeilen werden als RDDs zurückgegeben. |
coalesce(numPartitions) | Reduzieren Sie die Anzahl der Partitionen in der RDD auf numPartitions. Dies ist nützlich, um Vorgänge nach dem Filtern eines großen Datensatzes effizienter auszuführen. |
repartition(numPartitions) | Mischen Sie die Daten in der RDD nach dem Zufallsprinzip neu, um mehr oder weniger Partitionen zu erstellen und zwischen diesen Partitionen auszugleichen. Dies stellt sicher, dass alle Daten im Netzwerk immer gemischt werden. |
repartitionAndSortWithinPartitions(partitioner) | Partitioniert das RDD gemäß dem angegebenen Partitionierer neu und sortiert die Datensätze nach Schlüssel innerhalb jeder resultierenden Partition. Dies ist effizienter als das Aufrufen einer Unterteilung und es ist effizienter, innerhalb jeder Partition zu sortieren, da Sie die Sortierung auf den Shuffle-Mechanismus verschieben können. |
Aktion | Bedeutung |
---|---|
reduce(func) | Verwenden Sie die Funktion func (die zwei Argumente akzeptiert und eines zurückgibt), um die Elemente des Datasets zu aggregieren. Funktionen müssen konvertierbar und assoziativ sein, damit sie parallel korrekt berechnet werden können. |
collect() | Das Treiberprogramm gibt alle Elemente des Datasets als Array zurück. Dies ist normalerweise nach Filtern und anderen Vorgängen nützlich, die eine ausreichend kleine Teilmenge der Daten zurückgeben. |
count() | Gibt die Anzahl der Elemente im Dataset zurück. |
first() | Gibt das erste Element des Datasets zurück (ähnlich wie bei take (1)). |
take(n) | Gibt ein Array zurück, das die ersten n Elemente des Datasets enthält. |
takeSample(withReplacement, num, [seed]) | Gibt ein Array zurück, das Zufallsstichproben der num-Elemente des Datasets enthält, die mit einem Zufallsgenerator-Seed vordefiniert sind, mit oder ohne Substitution. |
takeOrdered(n, [ordering]) | Gibt die ersten n Elemente der RDD in natürlicher Reihenfolge oder mit einem benutzerdefinierten Komparator zurück. |
saveAsTextFile(path) | Beschreiben Sie die Elemente der Datendatei als Textdatei (oder als Satz von Textdateien) in einem bestimmten Verzeichnis in Ihrem lokalen Dateisystem, HDFS oder einem anderen von Hadoop unterstützten Dateisystem. Spark ruft den toString jedes Elements auf, um es in eine Textzeile in der Datei zu konvertieren. |
saveAsSequenceFile(path) | |
(Java and Scala) | Schreibt die Elemente der Datendatei als Hadoop-Sequenzdatei in den angegebenen Pfad des lokalen Dateisystems, HDFS oder eines anderen von Hadoop unterstützten Dateisystems. Es ist in der RDD von Schlüssel / Wert-Paaren verfügbar, die die beschreibbare Schnittstelle von Hadoop implementieren. In Scala können Sie auch Typen verwenden, die implizit in Writable konvertiert werden können (Spark umfasst die Konvertierung von Basistypen wie Int, Double, String). |
saveAsObjectFile(path) | |
(Java and Scala) | Verwenden Sie die Java-Serialisierung, um die Elemente eines Datasets in einem einfachen Format zu beschreiben.Die Java-Serialisierung ist SparkContext.Es kann mit objectFile () geladen werden. |
countByKey() | Nur für RDDs vom Typ (K, V) verfügbar. Gibt die Hashmap des (K, Int) -Paares zurück und zählt jeden Schlüssel. |
foreach(func) | Führen Sie die Funktionsfunktion für jedes Element des Datasets aus. Dies erfolgt normalerweise bei Nebenwirkungen wie Akkumulatoraktualisierungen und Interaktionen mit externen Speichersystemen. Hinweis: Das Ändern anderer Variablen als Akkumulatoren außerhalb von foreach () kann zu undefiniertem Verhalten führen. Weitere Informationen finden Sie unter Grundlegendes zu Abschlüssen. |
Ich gabelte mich, weil es sehr leicht zu verstehen war, was bei Wettbewerben in Übersee verwendet wurde. Da es sich um einen Jupyter handelt, führen Sie ihn nur in der Reihenfolge von oben aus.
Klicken Sie hier für die Quelle https://github.com/miyamotok0105/spark-py-notebooks
Informationen zum Lesen und Parallelisieren von Dateien
Über Karte, Filter, Sammeln
Erläutert die RDD-Stichprobenmethode.
Eine kurze Einführung in einige RDD-Pseudo-Set-Operationen.
Über RDD-Aktionen reduzieren, falten, aggregieren.
Umgang mit Schlüssel / Wert-Paaren zum Aggregieren und Erkunden von Daten.
Ein Notizbuch, das die grundlegenden Statistiken von MLlib für lokale Vektortypen, explorative Datenanalyse und Modellauswahl enthält.
Klassifizierung markierter Punkte und logistischer Regressionen für Netzwerkangriffe in MLlib. Anwendung der Modellauswahlmethode unter Verwendung der Korrelationsmatrix und des Hypothesentests.
Eine Methode, die die Verwendung baumbasierter Methoden und die Auswahl von Modellen und Features erklärt.
Dieses Notizbuch leitet das Schema für einen Datensatz von Netzwerkinteraktionen ab. Auf dieser Grundlage verwenden wir die SQL DataFrame-Abstraktion von Spark, um eine strukturiertere explorative Datenanalyse durchzuführen.
Clustering-Prozess von Irisdaten.
data_file = "./kddcup.data_10_percent.gz"
#Allgemeine Schöpfung
raw_data = sc.textFile(data_file)
#Parallel erstellen
raw_data = sc.parallelize(data_file)
#Filterkonvertierung
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)
#Kartenkonvertierung
csv_data = raw_data.map(lambda x: x.split(","))
Gibt ein Array zurück, das Zufallsstichproben der num-Elemente des Datasets enthält, die mit einem Zufallsgenerator-Startwert vorgegeben sind.
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_raw_data = raw_data.filter(lambda x: "normal." in x)
#Subtrahieren
attack_raw_data = raw_data.subtract(normal_raw_data)
#Kartesisches Produkt (direktes Produktset)
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))
Es handelt sich um eine Produktempfehlung für Kunden, die eine Matrix aus Benutzern und Artikeln verwendet. Aus dieser Matrix kann gesagt werden, dass es sich um einen Mechanismus handelt, um die Korrelation von Benutzern zu analysieren und Empfehlungen abzugeben, die auf der Annahme beruhen, dass ähnliche Benutzer die Produkte kaufen, die sie kaufen. Referenz
Kooperative Filterung
Inhaltsbasierte (inhaltsbasierte) Filterung
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model = ALS.train(ratings, rank, numIterations)
predictions_all = model.predictAll(sc.parallelize(f_XY)).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))
https://www.oreilly.co.jp/books/9784873117508/
Quelle herunterladen. Ganz Scala. Und dieses Buch hat eine ziemlich starke Scala-Farbe. Ich schreibe unter der Annahme, dass ich Spark kenne.
https://github.com/sryza/aas.git
git checkout 1st-edition
Daten bekommen
wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
tar xzvf profiledata_06-May-2005.tar.gz
Ergebnis
profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt
Quelle https://github.com/sryza/aas/blob/1st-edition/ch03-recommender/src/main/scala/com/cloudera/datascience/recommender/RunRecommender.scala
BigDL(torch base) https://github.com/intel-analytics/BigDL TensorFlow https://github.com/yahoo/TensorFlowOnSpark keras https://github.com/maxpumperla/elephas
http://qiita.com/joemphilips/items/de5d12723b9b88b5b090
Sicher funktioniert das. Ich war in Schwierigkeiten, weil ich einen Fehler mit permmision bekommen habe, aber ich habe das Gefühl, dass ich einen Fehler bekommen habe, weil ich überhaupt nicht genug Ordner und Dateien hatte. Ich erinnere mich, dass ich etwas hinzugefügt habe, indem ich mir das Fehlerprotokoll von Spark oder so angesehen habe.
http://spark.apache.org/docs/latest/programming-guide.html Mastering Apache Spark 2 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-transformers.html
http://yuroyoro.hatenablog.com/entry/20100317/1268819400
Liste ist wichtig in Scala Verwenden Sie die Case-Klasse Unveränderliches Programm
http://en.wikipedia.org/wiki/Collaborative_filtering http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866 https://www.slideshare.net/hoxo_m/ss-53305070
https://www.youtube.com/watch?v=qIs4nNFgi0s
Recommended Posts