[PYTHON] Getting started with Spark

What is Spark

Lightning fast cluster computing. A library that distributes batch processing on a large scale. It does a good job of distributed processing. You can use SQL. Streaming data can be used. Can be used for machine learning. Graph theory can be used. Can be loaded with deep learning. These make full use of memory and distribute the cluster at high speed.

スクリーンショット 2017-03-03 1.11.09.png

Tried environment


Spark installation

JDK installation

Ubuntu


sudo apt-get install -y openjdk-8-jdk

Mac


brew cask install java

maven installation

Ubuntu


sudo apt install maven

mac


brew install maven

Spark installation

Let / usr / local / spark be SPARK_HOME. Select any version. http://ftp.riken.jp/net/apache/spark/

Ubuntu


wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz
$ tar zxvf spark-1.6.2-bin-hadoop2.6.tgz
$ sudo mv spark-1.6.2-bin-hadoop2.6 /usr/local/
$ sudo ln -s /usr/local/spark-1.6.2-bin-hadoop2.6 /usr/local/spark

Add the following to .bashrc

Ubuntu


export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

Mac


brew install apache-spark

run spark-shell

python


$ spark-shell --master local[*]
(Omission)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
(Omission)

scala> val textFile = sc.textFile("/usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = /usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29

scala> wordCounts.collect()
res0: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), ...(Omission)...,  (>>>,1), (programming,1), (T...
scala>

Check if it works on the console. When using with python

python


./bin/pyspark

If you want to run pyspark on jupyter

Add the following to .bashrc

python


#spark                                                                                                                                                        
export SPARK_HOME=/usr/local/spark/spark-1.6.2-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin
#jupyter spark
export PYSPARK_PYTHON=$PYENV_ROOT/shims/python #Match the path according to the environment
export PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

python


source .bashrc
pyspark

Executing the pyspark command launches jupyter. If you get an error that doesn't grab spark's RDD, restarting the kernel fixed it.


Distributed Data Set (RDD)

Parallelized collection

Parallel execution becomes possible.

Scala


val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Java


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

Python


data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

External data set

Scala


val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt

Java


JavaRDD<String> distFile = sc.textFile("data.txt");

Python


distFile = sc.textFile("data.txt")

RDD operation

Basic

Get the data with textFile and put it on rdd Convert with map Aggregate with reduce

Scala


val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

Java


JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

Python


lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

Passing a sparking function

Scala


object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

Java


JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

Python


"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

Understand closures

Example

Scala


var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

Java


int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

Python


counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

Manipulating key-value pairs

Scala


val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

Java


JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

Python


lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

conversion

conversion meaning
map(func) Convert to a new distributed dataset formed by passing each element of the source with the func function.
filter(func) Select the source element for which func returns true and return a new dataset.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (func must return Seq instead of a single item).
mapPartitions(func) Similar to a map, but it runs individually on each partition (block) of the RDD, so when running on a type T RDD, func is an Iterator. => Iterator must be.
mapPartitionsWithIndex(func) Similar to mapPartitions, but func is given an integer value that represents the index of the partition. Therefore, when running with type T RDD, func is (Int, Iterator))=> Iterator Must be of type.
sample(withReplacement, fraction, seed) Samples a fractional part of the data with or without substitution using the specified random number generator seed.
union(otherDataset) Returns a new dataset containing the union of the elements and arguments in the source dataset.
intersection(otherDataset) Returns a new RDD that contains the intersection of the elements and arguments in the source dataset.
distinct([numTasks])) Returns a new dataset that contains different elements of the source dataset.
groupByKey([numTasks]) When called on a (K, V) set of datasets, (K, Iterable)) Returns a set of datasets. Note: Using reduceByKey or aggregateByKey can significantly improve performance when grouping by key to perform aggregations (such as totals and averages). Note: By default, the degree of parallelism of the output depends on the number of partitions in the parent RDD. You can set a different number of tasks by passing the optional numTasks argument.
reduceByKey(func, [numTasks]) When called on a (K, V) set of datasets, the value of each key is (V, V)=>Aggregated using the typed reduce function func (K, V) V.Like groupByKey, the number of reduce tasks can be set via the optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called with a dataset of (K, V) pairs, it returns a dataset of (K, U) pairs. Here, the value of each key is aggregated using the specified join function and a neutral "zero" value. Allow aggregate value types that are different from input value types, while avoiding unnecessary allocations. As with groupByKey, the number of reduce tasks can be set with the optional second argument.
sortByKey([ascending], [numTasks]) When K is called on a dataset of (K, V) pairs that implements Ordered, the dataset of (K, V) pairs sorted by ascending or descending key, as specified by the Boolean ascending argument. Returns.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), it returns a dataset of pairs (K, (V, W)) that contains all the element pairs for each key. Outer joins are supported by leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks]) (K、(Iterable 、Iterable )) Returns the tuple dataset. This operation is also known as groupWith.
cartesian(otherDataset) When called against a dataset of type T and type U, it returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Shell commands for each partition of the RDD (eg Perl or bash script. RDD elements are written to the process's stdin and the lines printed to stdout are returned as RDD.
coalesce(numPartitions) Reduce the number of partitions in the RDD to numPartitions. This is useful for performing operations more efficiently after filtering large datasets.
repartition(numPartitions) Randomly reshuffle the data in the RDD to create more or fewer partitions and balance between those partitions. This will always shuffle all the data on your network.
repartitionAndSortWithinPartitions(partitioner) Repartitions the RDD according to the specified partitioner and sorts the records by key within each resulting partition. This is more efficient than calling subdivision and it is more efficient to sort within each partition as you can push the sort to the shuffle mechanism.

action

action meaning
reduce(func) Use the func function (which takes two arguments and returns one) to aggregate the elements of the dataset. Functions must be commutative and associative so that they can be calculated correctly in parallel.
collect() The driver program returns all the elements of the dataset as an array. This is usually useful after filters and other operations that return a small enough subset of the data.
count() Returns the number of elements in the dataset.
first() Returns the first element of the dataset (similar to take (1)).
take(n) Returns an array containing the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Returns an array containing a random sample of the num elements of the dataset, pre-specified with a random number generator seed, with or without substitutions.
takeOrdered(n, [ordering]) Returns the first n elements of the RDD, using either natural order or a custom comparator.
saveAsTextFile(path) Describe the elements of the data file as a text file (or set of text files) in a specific directory on your local file system, HDFS, or any other Hadoop-supported file system. Spark calls each element's toString to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala) Writes the elements of the data file as a Hadoop Sequence File to the specified path of the local file system, HDFS, or other file system supported by Hadoop. It is available in RDDs for key / value pairs that implement Hadoop's Writable interface. In Scala, you can also use types that can be implicitly converted to Writable (Spark includes conversions to basic types such as Int, Double, and String).
saveAsObjectFile(path)
(Java and Scala) Write the elements of your dataset in a simple format using Java serialization.Java serialization is SparkContext.It can be loaded using objectFile ().
countByKey() Only available for type (K, V) RDDs. Returns a hashmap of (K, Int) pairs, counting each key.
foreach(func) Run the func function for each element of the dataset. This is typically done for side effects such as accumulator updates and interactions with external storage systems. Note: Changing variables other than Accumulators outside of foreach () can lead to undefined behavior. For more information, see Understanding Closures.

Let's move it first (python edition)

I forked because it was very easy to understand what was used in overseas competitions. Since it is Jupyter, only execute it in order from the top.

Click here for source https://github.com/miyamotok0105/spark-py-notebooks


table of contents

Creating an RDD

About reading and parallelizing files

RDD basics

About map, filter, collect

RDD sampling

Explains the RDD sampling method.

RDD set operation

A brief introduction to some RDD pseudo-set operations.

Data aggregation on RDD

About RDD actions reduce, fold, aggregate.

Manipulating key / value pairs RDD

How to handle key / value pairs for aggregating and exploring data.


MLlib: Basic statistics and exploratory data analysis

A notebook that presents MLlib's basic statistics for local vector types, Exploratory Data Analysis and model selection.

MLlib: Logistic regression

Classification of labeled points and logistic regression for network attacks in MLlib. Application of model selection method using correlation matrix and hypothesis test.

MLlib: Decision Tree

A method that helps explain the use of tree-based methods and the selection of models and features.

Spark SQL: Structured processing for data analysis

This notebook infers the schema for a dataset of network interactions. Based on that, we use Spark's SQL DataFrame abstraction to perform more structured exploratory data analysis.

Clustering with MLlib (KMeans)

Iris data clustering process.


Main contents

Creating an RDD

data_file = "./kddcup.data_10_percent.gz"
#General creation
raw_data = sc.textFile(data_file) 
#Create parallel
raw_data = sc.parallelize(data_file) 

RDD basics

#Filter conversion
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)
#Map conversion
csv_data = raw_data.map(lambda x: x.split(","))

RDD sampling

Returns an array containing random samples of the num elements of the dataset, pre-specified with a random number generator seed.

raw_data_sample = raw_data.takeSample(False, 400000, 1234)

Set the operation of RDD

normal_raw_data = raw_data.filter(lambda x: "normal." in x)
#Subtraction
attack_raw_data = raw_data.subtract(normal_raw_data)
#Cartesian product (cartesian product)
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))

Recommendation by Mlib collaborative filtering

What is collaborative filtering?

Product recommendations to customers using a matrix of users and items. From this matrix, it can be said that it is a mechanism to analyze the correlation of users and make recommendations based on the assumption that similar users will buy the products they are buying. Reference

Collaborative_filtering.gif


Content-based and collaborative filtering

Collaborative filtering --Recommendation based on user behavior

Content-based (content-based) filtering --Similarity sorted by item feature vector and recommended

Details


Advantages and disadvantages of content-based and collaborative filtering

スクリーンショット 2017-03-03 9.59.31.png

Loading the recommendation module

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

Making recommendations

model = ALS.train(ratings, rank, numIterations)

Forecast

predictions_all = model.predictAll(sc.parallelize(f_XY)).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))

Practical data analysis with Spark-Machine learning casebook for large-scale data

https://www.oreilly.co.jp/books/9784873117508/

Download source. Completely Scala. And this book has a pretty strong Scala color. I'm writing on the premise that I know Spark.

https://github.com/sryza/aas.git
git checkout 1st-edition

Chapter 2 Music Recommendations and Audioscrobbler Datasets

Get data

wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
tar xzvf profiledata_06-May-2005.tar.gz 
result
profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt

Source https://github.com/sryza/aas/blob/1st-edition/ch03-recommender/src/main/scala/com/cloudera/datascience/recommender/RunRecommender.scala


Plus α

An example of a deep learning framework riding on Spark

BigDL(torch base) https://github.com/intel-analytics/BigDL TensorFlow https://github.com/yahoo/TensorFlowOnSpark keras https://github.com/maxpumperla/elephas


## All in docker https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook Easy-to-understand material about docker https://www.slideshare.net/ShinichiroOhhara/docker2017

If you want to call Jupyter remotely

http://qiita.com/joemphilips/items/de5d12723b9b88b5b090

Certainly this works. I was in trouble because I got an error with permmision, but I feel that I got an error because I didn't have enough folders and files in the first place. I remember adding something by looking at the error log of Spark or something.


Spark Program Guide

http://spark.apache.org/docs/latest/programming-guide.html Mastering Apache Spark 2 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-transformers.html


Scala keywords

http://yuroyoro.hatenablog.com/entry/20100317/1268819400

Lists are important in Scala Use Case class Immutable program


Collaborative filtering

http://en.wikipedia.org/wiki/Collaborative_filtering http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866 https://www.slideshare.net/hoxo_m/ss-53305070

Setup when clustering with aws

https://www.youtube.com/watch?v=qIs4nNFgi0s

Recommended Posts

Getting started with Spark
Getting Started with Cisco Spark REST-API
Getting started with Android!
1.1 Getting Started with Python
Getting Started with Golang 2
Getting Started with Golang 1
Getting Started with Python
Getting Started with Django 1
Getting Started with Optimization
Getting Started with Numpy
Getting Started with Python
Getting Started with Pydantic
Getting Started with Golang 4
Getting Started with Jython
Getting Started with Django 2
Translate Getting Started With TensorFlow
Getting Started with Python Functions
Getting Started with Tkinter 2: Buttons
Getting Started with Go Assembly
Getting Started with PKI with Golang ―― 4
Getting Started with Python Django (1)
Getting Started with Python Django (4)
Getting Started with Python Django (3)
Getting Started with Python Django (6)
Getting Started with Django with PyCharm
Python3 | Getting Started with numpy
Getting Started with Python Django (5)
Getting Started with Python responder v2
Getting Started with Git (1) History Storage
Getting started with Sphinx. Generate docstring with Sphinx
Getting Started with Python Web Applications
Getting Started with Sparse Matrix with scipy.sparse
Getting Started with Julia for Pythonista
Getting Started with Python Basics of Python
Getting started with USD on Windows
Getting Started with Python Genetic Algorithms
Getting started with Python 3.8 on Windows
Getting Started with Python for PHPer-Functions
Getting Started with CPU Steal Time
Grails getting started
Getting Started with python3 # 1 Learn Basic Knowledge
Getting Started with Python Web Scraping Practice
Getting Started with Python for PHPer-Super Basics
Getting Started with Python Web Scraping Practice
Getting started with Dynamo from Python boto
Getting Started with Heroku, Deploying Flask App
Getting Started with TDD with Cyber-dojo at MobPro
Getting started with Python with 100 knocks on language processing
Django 1.11 started with Python3.6
Basket analysis with Spark (1)
MongoDB Basics: Getting Started with CRUD in JAVA
Getting Started with Drawing with matplotlib: Writing Simple Functions
Getting started with Keras Sequential model Japanese translation
[Translation] Getting Started with Rust for Python Programmers
Django Getting Started Part 2 with eclipse Plugin (PyDev)
Get started with MicroPython
Getting started with AWS IoT easily in Python
Get started with Mezzanine
Getting Started with Python's ast Module (Using NodeVisitor)
Materials to read when getting started with Python
Settings for getting started with MongoDB in python