[PYTHON] Premiers pas avec Spark

Qu'est-ce que Spark

Informatique en cluster ultra-rapide. Une bibliothèque qui distribue le traitement par lots à grande échelle. Il fait un bon travail de traitement distribué. Vous pouvez utiliser SQL. Les données en streaming peuvent être utilisées. Peut être utilisé pour l'apprentissage automatique. La théorie des graphes peut être utilisée. L'apprentissage profond peut être placé. Ceux-ci utilisent pleinement la mémoire et distribuent le cluster à grande vitesse.

スクリーンショット 2017-03-03 1.11.09.png

Environnement éprouvé


Installation d'étincelles

Installation du JDK

Ubuntu


sudo apt-get install -y openjdk-8-jdk

Mac


brew cask install java

installation de maven

Ubuntu


sudo apt install maven

mac


brew install maven

Installation d'étincelles

Soit / usr / local / spark SPARK_HOME. Sélectionnez n'importe quelle version. http://ftp.riken.jp/net/apache/spark/

Ubuntu


wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz
$ tar zxvf spark-1.6.2-bin-hadoop2.6.tgz
$ sudo mv spark-1.6.2-bin-hadoop2.6 /usr/local/
$ sudo ln -s /usr/local/spark-1.6.2-bin-hadoop2.6 /usr/local/spark

Ajoutez ce qui suit à .bashrc

Ubuntu


export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

Mac


brew install apache-spark

exécution de la coquille d'étincelle

python


$ spark-shell --master local[*]
(Omission)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
(Omission)

scala> val textFile = sc.textFile("/usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = /usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29

scala> wordCounts.collect()
res0: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), ...(Omission)...,  (>>>,1), (programming,1), (T...
scala>

Vérifiez si cela fonctionne sur la console. Lors de l'utilisation avec python

python


./bin/pyspark

Si vous voulez exécuter pyspark avec jupyter

Ajoutez ce qui suit à .bashrc

python


#spark                                                                                                                                                        
export SPARK_HOME=/usr/local/spark/spark-1.6.2-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin
#jupyter spark
export PYSPARK_PYTHON=$PYENV_ROOT/shims/python #Faites correspondre le chemin en fonction de l'environnement
export PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

python


source .bashrc
pyspark

L'exécution de la commande pyspark lance jupyter. Si vous obtenez une erreur qui ne récupère pas le RDD de spark, vous pouvez la corriger en redémarrant le noyau.


Ensemble de données distribué (RDD)

Collection parallélisée

L'exécution parallèle devient possible.

Scala


val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Java


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

Python


data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

Ensemble de données externes

Scala


val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt

Java


JavaRDD<String> distFile = sc.textFile("data.txt");

Python


distFile = sc.textFile("data.txt")

Fonctionnement RDD

De base

Récupérez les données avec textFile et mettez-les sur rdd Convertir avec la carte Agréger avec réduire

Scala


val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

Java


JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

Python


lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

Passer une fonction d'étincelle

Scala


object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

Java


JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

Python


"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

Comprendre la fermeture

Exemple

Scala


var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

Java


int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Python


counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

Manipulation des paires clé-valeur

Scala


val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

Java


JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

Python


lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

conversion

conversion sens
map(func) Convertissez en un nouvel ensemble de données distribué formé en passant chaque élément de la source avec la fonction func.
filter(func) Sélectionne l'élément source pour lequel func renvoie true et renvoie un nouvel ensemble de données.
flatMap(func) Similaire à map, mais chaque élément d'entrée peut être mappé à 0 ou plusieurs éléments de sortie (func doit renvoyer Seq au lieu d'un seul élément).
mapPartitions(func) Similaire à une carte, mais exécuté individuellement sur chaque partition (bloc) du RDD, donc lors de l'exécution sur un RDD de type T, func est un itérateur => Iterator doit être.
mapPartitionsWithIndex(func) Similaire à mapPartitions, mais func reçoit une valeur entière qui représente l'index de la partition. Par conséquent, lors de l'exécution avec le type T RDD, func est (Int, Iterator))=> Iterator Doit être de type.
sample(withReplacement, fraction, seed) Utilise le générateur de nombres aléatoires spécifié pour échantillonner une partie fractionnaire des données avec ou sans substitution.
union(otherDataset) Renvoie un nouvel ensemble de données contenant la somme des éléments et des arguments de l'ensemble de données source.
intersection(otherDataset) Renvoie un nouveau RDD contenant les parties communes des éléments et arguments de l'ensemble de données source.
distinct([numTasks])) Renvoie un nouvel ensemble de données contenant différents éléments de l'ensemble de données source.
groupByKey([numTasks]) Lorsqu'il est appelé avec l'ensemble (K, V) d'ensembles de données, (K, Iterable)) Renvoie un ensemble d'ensembles de données. Remarque: l'utilisation de reduceByKey ou aggregateByKey peut considérablement améliorer les performances lors du regroupement par clé pour effectuer des agrégations (telles que des sommes et des moyennes). Remarque: Par défaut, le degré de parallélisme de la sortie dépend du nombre de partitions dans le RDD parent. Vous pouvez définir un nombre différent de tâches en transmettant l'argument optionnel numTasks.
reduceByKey(func, [numTasks]) Lorsqu'elle est appelée avec une paire (K, V) d'ensembles de données, la valeur de chaque clé est (V, V)=>Agrégé à l'aide de la fonction de réduction typée func (K, V) V.Comme avec groupByKey, le nombre de tâches de réduction peut être défini via le deuxième argument facultatif.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Lorsqu'il est appelé avec une paire (K, V) d'ensembles de données, il renvoie une paire (K, U) d'ensembles de données. Ici, la valeur de chaque clé est agrégée à l'aide de la fonction de jointure spécifiée et d'une valeur neutre "zéro". Autorisez les types de valeur agrégée qui sont différents des types de valeur d'entrée, tout en évitant les allocations inutiles. Comme avec groupByKey, le nombre de tâches de réduction peut être défini avec le deuxième argument facultatif.
sortByKey([ascending], [numTasks]) Lorsque K est appelé avec un ensemble de données de paires (K, V) qui implémente Ordered, un ensemble de données de paires (K, V) triées par clé ascendante ou décroissante, comme spécifié par l'argument ascendant booléen. Retour.
join(otherDataset, [numTasks]) Lorsqu'il est appelé avec des ensembles de données de type (K, V) et (K, W), il renvoie un ensemble de données de paires (K, (V, W)) contenant toutes les paires d'éléments pour chaque clé. Les jointures externes sont prises en charge par leftOuterJoin, rightOuterJoin et fullOuterJoin.
cogroup(otherDataset, [numTasks]) (K、(Iterable 、Iterable )) Renvoie le jeu de données tuple. Cette opération est également connue sous le nom de groupWith.
cartesian(otherDataset) Lorsqu'il est appelé sur un ensemble de données de type T et de type U, il renvoie un ensemble de données de paires (T, U) (toutes les paires d'éléments).
pipe(command, [envVars]) Commandes shell pour chaque partition du RDD (par exemple, script Perl ou bash. Les éléments RDD sont écrits dans le stdin du processus et les lignes imprimées sur stdout sont renvoyées sous forme de RDD.
coalesce(numPartitions) Réduisez le nombre de partitions dans le RDD à numPartitions. Ceci est utile pour effectuer des opérations plus efficacement après le filtrage d'un grand ensemble de données.
repartition(numPartitions) Remaniez aléatoirement les données dans le RDD pour créer plus ou moins de partitions et équilibrer ces partitions. Cela garantit que toutes les données du réseau sont toujours mélangées.
repartitionAndSortWithinPartitions(partitioner) Repartitionne le RDD en fonction du partitionneur spécifié et trie les enregistrements par clé dans chaque partition résultante. C'est plus efficace que d'appeler la subdivision et il est plus efficace de trier dans chaque partition car vous pouvez pousser le tri vers le mécanisme de lecture aléatoire.

action

action sens
reduce(func) Utilisez la fonction func (qui prend deux arguments et en renvoie un) pour agréger les éléments de l'ensemble de données. Les fonctions doivent être convertibles et associatives pour pouvoir être calculées correctement en parallèle.
collect() Le programme pilote renvoie tous les éléments de l'ensemble de données sous forme de tableau. Cela est généralement utile après les filtres et autres opérations qui renvoient un sous-ensemble suffisamment petit de données.
count() Renvoie le nombre d'éléments dans l'ensemble de données.
first() Renvoie le premier élément de l'ensemble de données (similaire à take (1)).
take(n) Renvoie un tableau contenant les n premiers éléments de l'ensemble de données.
takeSample(withReplacement, num, [seed]) Renvoie un tableau contenant des échantillons aléatoires des nombres éléments de l'ensemble de données, pré-spécifiés avec des graines de générateur aléatoires, avec ou sans substitutions.
takeOrdered(n, [ordering]) Renvoie les n premiers éléments du RDD, en utilisant l'ordre naturel ou un comparateur personnalisé.
saveAsTextFile(path) Décrivez les éléments du fichier de données sous forme de fichier texte (ou ensemble de fichiers texte) dans un répertoire spécifique sur votre système de fichiers local, HDFS ou tout autre système de fichiers pris en charge par Hadoop. Spark appelle le toString de chaque élément pour le convertir en une ligne de texte dans le fichier.
saveAsSequenceFile(path)
(Java and Scala) Écrit les éléments du fichier de données sous forme de fichier de séquence Hadoop dans le chemin spécifié du système de fichiers local, HDFS ou de tout autre système de fichiers pris en charge par Hadoop. Il est disponible dans le RDD des paires clé / valeur qui implémentent l'interface Writable de Hadoop. Dans Scala, vous pouvez également utiliser des types qui peuvent être implicitement convertis en Writable (Spark inclut la conversion de types de base tels que Int, Double, String).
saveAsObjectFile(path)
(Java and Scala) Utilisez la sérialisation Java pour décrire les éléments d'un ensemble de données dans un format simple.La sérialisation Java est SparkContext.Il peut être chargé en utilisant objectFile ().
countByKey() Uniquement disponible pour les RDD de type (K, V). Renvoie une table de hachage de paires (K, Int), en comptant chaque clé.
foreach(func) Exécutez la fonction func pour chaque élément de l'ensemble de données. Ceci est généralement effectué pour les effets secondaires tels que les mises à jour des accumulateurs et les interactions avec les systèmes de stockage externes. Remarque: la modification de variables autres que les accumulateurs en dehors de foreach () peut entraîner un comportement indéfini. Pour plus d'informations, voir Comprendre les fermetures.

Commençons par le déplacer (édition python)

J'ai bifurqué parce qu'il était très facile de comprendre ce qui était utilisé dans les compétitions à l'étranger. Puisqu'il s'agit d'un Jupyter, exécutez-le uniquement dans l'ordre du haut.

Cliquez ici pour la source https://github.com/miyamotok0105/spark-py-notebooks


table des matières

Créer un RDD

A propos de la lecture et de la parallélisation des fichiers

Principes de base du RDD

À propos de la carte, du filtrage, de la collecte

Échantillonnage RDD

Explique la méthode d'échantillonnage RDD.

Fonctionnement de l'ensemble RDD

Une brève introduction à certaines opérations de pseudo-ensembles RDD.

Agrégation de données sur RDD

À propos des actions RDD réduire, plier, agréger.

Fonctionnement de la paire clé / valeur RDD

Comment gérer les paires clé / valeur pour agréger et explorer les données.


MLlib: Statistiques de base et analyse exploratoire des données

Un cahier qui présente les statistiques de base de MLlib pour les types de vecteurs locaux, l'analyse exploratoire des données et la sélection de modèles.

MLlib: retour logistique

Classification des points étiquetés et régressions logistiques pour les attaques réseau dans MLlib. Application de la méthode de sélection du modèle utilisant la matrice de corrélation et le test d'hypothèse.

MLlib: Arbre de décision

Une méthode qui aide à expliquer l'utilisation de méthodes basées sur des arbres et la sélection de modèles et de fonctionnalités.

Spark SQL: traitement structuré pour l'analyse des données

Ce bloc-notes déduit le schéma d'un ensemble de données d'interactions réseau. Sur cette base, nous utilisons l'abstraction SQL DataFrame de Spark pour effectuer une analyse de données exploratoire plus structurée.

Clustering avec MLlib (K Means)

Processus de regroupement des données d'iris.


Contenu principal

Créer un RDD

data_file = "./kddcup.data_10_percent.gz"
#Création générale
raw_data = sc.textFile(data_file) 
#Créer un parallèle
raw_data = sc.parallelize(data_file) 

Principes de base du RDD

#Conversion de filtre
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)
#Conversion de carte
csv_data = raw_data.map(lambda x: x.split(","))

Échantillonnage RDD

Renvoie un tableau contenant des échantillons aléatoires des nombres éléments de l'ensemble de données, pré-spécifiés avec une graine de générateur aléatoire.

raw_data_sample = raw_data.takeSample(False, 400000, 1234)

Définir le fonctionnement de RDD

normal_raw_data = raw_data.filter(lambda x: "normal." in x)
#Soustraire
attack_raw_data = raw_data.subtract(normal_raw_data)
#Produit cartésien (ensemble de produits direct)
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))

Recommandation par co-filtrage Mlib

Qu'est-ce que le co-filtrage?

Il s'agit d'une recommandation de produit aux clients en utilisant une matrice d'utilisateurs et d'articles. À partir de cette matrice, on peut dire qu'il s'agit d'un mécanisme pour analyser la corrélation des utilisateurs et faire des recommandations basées sur l'hypothèse que des utilisateurs similaires achèteront les produits qu'ils achètent. Référence

Collaborative_filtering.gif


Basé sur le contenu et co-filtrage

Filtrage coopératif --Recommandation basée sur le comportement de l'utilisateur

Filtrage basé sur le contenu (basé sur le contenu) --Similarité triée par vecteur de caractéristique d'élément et recommandée

Détails


Avantages et inconvénients du filtrage basé sur le contenu et du co-filtrage

スクリーンショット 2017-03-03 9.59.31.png

Chargement du module de recommandation

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

Faire des recommandations

model = ALS.train(ratings, rank, numIterations)

Prévoir

predictions_all = model.predictAll(sc.parallelize(f_XY)).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))

Analyse pratique des données avec le casier d'apprentissage Spark-Machine pour des données à grande échelle

https://www.oreilly.co.jp/books/9784873117508/

Télécharger la source. Complètement Scala. Et ce livre a une couleur Scala assez forte. J'écris en partant du principe que je connais Spark.

https://github.com/sryza/aas.git
git checkout 1st-edition

Chapitre 2 Recommandations musicales et jeu de données Audioscrobbler

Obtenez des données

wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
tar xzvf profiledata_06-May-2005.tar.gz 
résultat
profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt

La source https://github.com/sryza/aas/blob/1st-edition/ch03-recommender/src/main/scala/com/cloudera/datascience/recommender/RunRecommender.scala


Plus α

Un exemple de framework d'apprentissage profond sur Spark

BigDL(torch base) https://github.com/intel-analytics/BigDL TensorFlow https://github.com/yahoo/TensorFlowOnSpark keras https://github.com/maxpumperla/elephas


## Docker tout-en-un https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook Matériel facile à comprendre sur Docker https://www.slideshare.net/ShinichiroOhhara/docker2017

Si vous souhaitez appeler Jupyter à distance

http://qiita.com/joemphilips/items/de5d12723b9b88b5b090

Cela fonctionne certainement. J'étais en difficulté parce que j'ai eu une erreur avec l'autorisation, mais j'ai l'impression d'avoir une erreur parce que je n'avais pas assez de dossiers et de fichiers au départ. Je me souviens avoir ajouté quelque chose en regardant le journal des erreurs de Spark ou quelque chose.


Guide du programme Spark

http://spark.apache.org/docs/latest/programming-guide.html Mastering Apache Spark 2 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-transformers.html


Mot-clé Scala

http://yuroyoro.hatenablog.com/entry/20100317/1268819400

La liste est importante dans Scala Utilisez la classe Case Programme immuable


Filtrage coopératif

http://en.wikipedia.org/wiki/Collaborative_filtering http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866 https://www.slideshare.net/hoxo_m/ss-53305070

Configuration lors du clustering avec aws

https://www.youtube.com/watch?v=qIs4nNFgi0s

Recommended Posts

Premiers pas avec Spark
Premiers pas avec Cisco Spark REST-API
Premiers pas avec Android!
1.1 Premiers pas avec Python
Premiers pas avec Python
Premiers pas avec Django 1
Introduction à l'optimisation
Premiers pas avec Numpy
Premiers pas avec Python
Premiers pas avec Pydantic
Premiers pas avec Jython
Premiers pas avec Django 2
Traduire Premiers pas avec TensorFlow
Introduction aux fonctions Python
Introduction à Tkinter 2: Button
Premiers pas avec Go Assembly
Premiers pas avec PKI avec Golang ―― 4
Premiers pas avec Python Django (1)
Premiers pas avec Python Django (4)
Premiers pas avec Python Django (3)
Introduction à Python Django (6)
Premiers pas avec Django avec PyCharm
Premiers pas avec Python Django (5)
Premiers pas avec Python responder v2
Introduction à Git (1) Stockage d'historique
Premiers pas avec Sphinx. Générer docstring avec Sphinx
Premiers pas avec les applications Web Python
Premiers pas avec Sparse Matrix avec scipy.sparse
Premiers pas avec Julia pour Pythonista
Premiers pas avec Python Bases de Python
Commençant par USD sur Windows
Premiers pas avec les algorithmes génétiques Python
Premiers pas avec Python 3.8 sous Windows
Premiers pas avec Python pour les fonctions PHPer
Premiers pas avec CPU Steal Time
Grails pour commencer
Premiers pas avec python3 # 1 Apprenez les connaissances de base
Premiers pas avec Python Web Scraping Practice
Premiers pas avec Python pour PHPer-Super Basics
Premiers pas avec Python Web Scraping Practice
Premiers pas avec Dynamo de Python boto
Premiers pas avec Heroku, déploiement de l'application Flask
Premiers pas avec TDD avec Cyber-dojo chez MobPro
Démarrer avec Python avec 100 coups sur le traitement du langage
Django 1.11 a démarré avec Python3.6
Analyse du panier avec Spark (1)
Principes de base de MongoDB: Premiers pas avec CRUD avec JAVA
Premiers pas avec le dessin avec matplotlib: écrire des fonctions simples
Premiers pas avec la traduction japonaise du modèle séquentiel Keras
[Français] Premiers pas avec Rust pour les programmeurs Python
Django Getting Started Part 2 avec eclipse Plugin (PyDev)
Démarrez avec MicroPython
Premiers pas avec AWS IoT facilement en Python
Démarrez avec Mezzanine
Premiers pas avec le module ast de Python (à l'aide de NodeVisitor)
Matériel à lire lors de la mise en route de Python
Paramètres pour démarrer avec MongoDB avec python