[PYTHON] Try Apache Spark on Jupyter Notebook (on local Docker

I used to use Spark before, but now I don't use it I will forget it soon, so I decided to make a note of the basics.

(Since it is a knowledge of listening to the whole, I expect comments and edit requests for mistakes)

use

A Docker image that runs a Jupyter + PySpark environment is provided, which is convenient for trying locally: https://hub.docker.com/r/jupyter/pyspark-notebook/

It's a story about PySpark, but Spark itself is Scala, There is a story that there is a guy that can be used with Python and that is PySpark.

It should have been a mechanism to do my best with IPC, so Scala <-> There is also a topic that the cost of converting Python is quite high.

Now let's use:

docker run -it -p 8888:8888 jupyter/pyspark-notebook

When you do this, the URL with the token number 8888 will flow to Terminal, so (Around To access the notebook, ...) When you access it, the Jupyter page will appear, You have a simple environment that you can code with Jupyter Notebook.

Home_Page_-_Select_or_create_a_notebook.png

Select Notebook: Python3 from New here to open the Notebook

Untitled_-_Jupyter_Notebook.png

try

The test code to see if it works is taken from the sample below (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()

# do something to prove it works
spark.sql('SELECT "Test" as c1').show()

I'm not sure about the Spark Session, but It's a recognition that it's like an instance of Spark itself.

If you do this and you get a table, it's OK:

Untitled_-_Jupyter_Notebook.png

Handle data

Let's target this kind of data:

id name gender age
1 Satoshi male 10
2 Shigeru male 10
3 Kasumi female 12

Input and definition

Here's a simple definition of data in Python:

from typing import List, Tuple

Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
    (1, 'Satoshi', 'male',   10),
    (2, 'Shigeru', 'male',   10),
    (3, 'Kasumi', 'female', 12),
]

The type of each line is Tuple [int, str, str, int] in Python's typing.

And Spark also has a schema definition:

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

Now you can define the column schema on the Spark side.

To convert data defined in Python to Spark's DataFrame:

from pyspark.sql import DataFrame

trainers_df: DataFrame = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)

You now have a DataFrame called trainers_df.

Since it can be read from CSV, MySQL, etc. as a data source, In reality, it will be read from such a data source rather than defined in the code. (In some cases, it is necessary to set JDBC or Hadoop, which will be described later.)

If you want to dump this and see it:

trainers_df.show()

This will output a few lines of formatted text in the table:

Untitled_-_Jupyter_Notebook.png

+---+------+------+---+
| id|  name|gender|age|
+---+------+------+---+
|  1|Satoshi|  male| 10|
|  2|Shigeru|  male| 10|
|  3|Kasumi|female| 12|
+---+------+------+---+

Aggregation and output

To get the value instead of the dump, just do .collect ():

result = trainers_df.collect()
print(result)

When exporting to CSV, export DataFrame in this way:

trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")

Like the input, there are various other output destinations such as S3, MySQL and Elasticsearch.

.coalesce (1) is the data divided for each partition. It is to coalesce into one partition. If you do not do this, the CSV will be output as it is divided.

Using Hadoop's hdfs command There is also a way to get the divided ones together.

It's basically lazy evaluation Since it is evaluated only after performing an operation like .collect () You shouldn't aggregate that often.

Basic

This alone doesn't make any sense just to display it, so let's do something appropriate:

trainers_df.createOrReplaceTempView('trainers');

male_trainers_df = spark.sql('''
    SELECT *
    FROM   trainers
    WHERE  gender = 'male'
''')
male_trainers_df.show()

This gives this result:

id name gender age
1 Satoshi male 10
2 Shigeru male 10

DataFrame.createOrReplaceTempView (name) is DataFrame, It can be registered as a temporary SQL View.

Now you can get the DF of the result of SQL operation for the View registered with spark.sql (query). That way, you can use Spark with the SQL you're used to, without any hesitation. The magic is that psychological barriers and learning costs are low.

You can also write the code as DataFrame without registering it in View:

male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')

There are cases where this is easier to use, so it's case by case.

application

Since you can use SQL, there is no problem with basic operations, but Most of the time, the case where you want to use Spark seems to be a situation where you want to perform some user-defined operation.

For example, as a case I wanted to do in the past, There is a thing called "morphological analysis of the article text and dividing it". This is difficult to achieve with SQL alone.

However, since there is MeCab on Python, If you do morphological analysis using the MeCab library, it will be decomposed without thinking, so Even if you don't understand at all like me, you can just throw it in MeCab for the time being.

How can I do that for DataFrame on Spark? It is good to define UDF (User-Defined Function).

(* There is a technique that you can apply lambda directly to RDD instead of DataFrame, This has poor performance).

To define a UDF, make the following definition:

from pyspark.sql.functions import udf

@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
    return name + {'male': 'Kun', 'female': 'Mr.'}.get(gender, 'Mr')

spark.udf.register('name_with_suffix', name_with_suffix)

By applying the @udf (ReturnType) decorator to the function that becomes the UDF The function can now be defined as a UDF. To use it with Spark SQL, register it with spark.udf.register (udf_name, udf) You can use it as it is for the same purpose as COUNT ().

By the way, even if you don't use a decorator, you can apply an existing function by ʻudf_fn = udf (fn) `.

The one given as an example of this depends on gender. It is to add a suffix corresponding to gender to name. Let's apply this function as a UDF:

dearest_trainers = spark.sql('''
    SELECT name_with_suffix(name, gender)
    FROM   trainers
''')
dearest_trainers.show()

The result is:

name_with_suffix(name, gender)
Satoshi-kun
Shigeru
Kasumi

In this example, there is an opinion that you can write using CASE even in SQL, but that's right.

It can be useful depending on what you want to do.

UDF

By the way, the above-mentioned morphological analysis is performed and divided. This would be a function like this as an image (Actually I use MeCab coolly):

import re

#Half size/Divide the character string by double-byte spaces and punctuation marks
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
    return [
        word
        for word
        in re.split('[  !…]+', text)
        if len(word) > 0
    ]

It is OK to apply this as it is. Let's write the sample code again while changing the data:

Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
    (1, 'Satoshi', 'male',   10, 'Get Pokemon'),
    (2, 'Shigeru', 'male',   10, 'This is the best of all! It means to be strong!'),
    (3, 'Kasumi', 'female', 12, 'My policy is ... at least with water-type Pokemon ... at least spree!'),
]

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

trainers_df = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');

wakachi_trainers_df = spark.sql('''
    SELECT id, name, wakachi(comment)
    FROM   trainers
''')
wakachi_trainers_df.show()

The point here is This time UDF receives str and expands it asList [str]. When I try this, it looks like this:

id name wakachi(comment)
1 Satoshi [Pokémon,get,That's right]
2 Shigeru [This me,In the world,I...
3 Kasumi [My,Policy,Mizu...

The expanded cells are in a list It is in a nested state with more columns inside the columns.

What if you want to expand this as a column for each str? You can apply more functions to expand:

https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column)

from pyspark.sql.functions import explode

wakachi_trainers_df = spark.sql('''
    SELECT id, name, explode(wakachi(comment))
    FROM   trainers
''')
wakachi_trainers_df.show()

Since there is a function called ʻexplode`, Applying this will expand the nested elements as their respective columns:

id name col
1 Satoshi Pokémon
1 Satoshi get
1 Satoshi That's right
2 Shigeru This me
2 Shigeru In the world
2 Shigeru most
2 Shigeru Strong
2 Shigeru That's what it is
3 Kasumi My
3 Kasumi Policy
3 Kasumi Mizu
3 Kasumi type
3 Kasumi In Pokemon
3 Kasumi at least
3 Kasumi at least
3 Kasumi Spree
3 Kasumi That

join

As a further point, you can create a JOIN between DataFrames. Specify the column to be used for join, which is the same as JOIN such as ordinary MySQL. Based on that, DataFrame is combined.

Let's add more sample code and use JOIN:

Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
    (1, 1, 'Pikachu', 99),
    (2, 1, 'Charizard', 99),
    (3, 2, 'Eevee',   50),
    (4, 3, 'Goldeen', 20),
    (5, 3, 'Staryu', 30),
    (6, 3, 'Starmie', 40),
]
pkmns_schema = StructType([
    StructField('id',         IntegerType(), True),
    StructField('trainer_id', IntegerType(), True),
    StructField('name',       StringType(),  True),
    StructField('level',      IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
    spark.sparkContext.parallelize(pkmns),
    pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');

trainer_and_pkmns_df = spark.sql('''
    SELECT     *
    FROM       trainers
    INNER JOIN pkmns
          ON   trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id name gender age comment id trainer_id name level
1 Satoshi male 10 Get Pokemon 1 1 Pikachu 99
1 Satoshi male 10 Get Pokemon 2 1 Charizard 99
3 Kasumi female 12 My policy is ... Mizuta... 4 3 Goldeen 20
3 Kasumi female 12 My policy is ... Mizuta... 5 3 Staryu 30
3 Kasumi female 12 My policy is ... Mizuta... 6 3 Starmie 40
2 Shigeru male 10 I'm the best... 3 2 Eevee 50

By the way, there are many types other than ʻINNER JOIN and ʻOUTER JOIN. This article is easy to understand, so I will quote it:

https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740

It's convenient because you can perform collective operations with this.

The concept of each JOIN is quoted because the Venn diagram on this page is easy to understand:

https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-dabbf3475690

As a point, JOIN is still costly and slow. If you have formed a cluster, it seems that operations such as finding it from the data distributed in various places, JOIN and returning it are performed.

Therefore, performance tuning, which will be described later, is required.

performance

In the real world, wrestling with huge datasets can be a daunting task. Because if it takes 4 hours or so, if it falls near the end, you will have to start over again. If you make a mistake twice, you will have devoted one day's work hours and overtime will be confirmed.

So, in order to improve such performance, we reduced the data to increase the efficiency of JOIN, You can change the partitioning method, It is necessary to devise so that the partition is not fragmented on the cluster as much as possible.

It is called Broadcast Join, and by daring to place datasets in all clusters in duplicate, There are also things like lowering the cost of searching datasets at JOIN.

As an important technique By setting DataFrame to .cache () at each checkpoint, In some cases, performance is dramatically improved.

If you look at the official page about performance, there is such a technique and it will be helpful:

https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries

MySQL

Well, it's common to want to read from a MySQL database and get rid of it. In this case you need to have a JDBC MySQL connector to work with MySQL, This person's entry and its Docker image will be helpful:

However, MySQL seems to be awkward to handle in Spark. (There are various addictive points)

Actually

Spark is powerful:

--The data is huge anyway --The processing you want to apply does not depend on each other --Each operation has no side effects and is completed by internal operations (no operations to external APIs)

I think that is.

Also, Spark is the key to creating clusters with multiple units and letting workers do the work. In reality, it seems that it is better to do it with Amazon EMR or AWS Glue because it is left to AWS. This is because if it is local, it works without creating a cluster, so even if you type in a huge amount of serious data, there will be no performance and you will not benefit from it.

You hit the limit of memory, Even if you can save money, if it is huge data that it takes two weeks to batch flow through the entire process, Even if it is simple, it may be possible to divide it by yourself and divide it into multiple processes and execute it. It's a good idea to leave it to Spark if it can.

Recommended Posts

Try Apache Spark on Jupyter Notebook (on local Docker
Try running Jupyter Notebook on Mac
Use apache Spark with jupyter notebook (IPython notebook)
Try SVM with scikit-learn on Jupyter Notebook
Try basic operations for Pandas DataFrame on Jupyter Notebook
EC2 provisioning with Vagrant + Jupyter (IPython Notebook) on Docker
Try using Jupyter Notebook dynamically
High charts on Jupyter notebook
View PDF on Jupyter Notebook
Run IPython Notebook on Docker
Run Jupyter Notebook on windows
Unable to display tensorboard in jupyter notebook on docker (solved)
Try running tensorflow on Docker + anaconda
Getting started on Docker Apache Hadoop
Run azure ML on jupyter notebook
Try starting Jupyter Notebook ~ Esper training
Resolve garbled Japanese characters in matplotlib of Jupyter Notebook on Docker
Make Jupyter Notebook a service on CentOS
Use BigQuery from your local Jupyter Notebook
Clone the github repository on jupyter notebook
GPU check of PC on jupyter notebook
Display histogram / scatter plot on Jupyter Notebook
Build jupyter notebook on remote server (CentOS)
Use vim keybindings on Docker-launched Jupyter Notebook
Run Jupyter notebook on a remote server
Install matplotlib and display graph on Jupyter Notebook
Build a LAMP environment on your local Docker
Try a state-space model (Jupyter Notebook + IR kernel)
[Jupyter Notebook / Lab] 3 ways to debug on Jupyter [Pdb]
Analytical environment construction with Docker (jupyter notebook + PostgreSQL)
Enable Jupyter Notebook with conda on remote server
Try using conda virtual environment with Jupyter Notebook
[Pythonocc] I tried using CAD on jupyter notebook
Simply display a line graph on Jupyter Notebook
Remotely open Jupyter notebook launched on the server
Try using Jupyter Notebook of Azure Machine Learning
jupyter notebook does not start on mac fish
The story of launching python2.x jupyter notebook using docker (crushed on Saturday and Sunday)
pykintone on Docker
Jupyter Notebook memo
Introducing Jupyter Notebook
Powerful Jupyter Notebook
Golang on jupyter
Jupyter on AWS
Jupyter notebook password
Virtual environment construction with Docker + Flask (Python) + Jupyter notebook
Easily launch jupyter notebook on AWS and access locally
Run Tensorflow from Jupyter Notebook on Bash on Ubuntu on Windows
[Windows] [Python3] Install python3 and Jupyter Notebook (formerly ipython notebook) on Windows
How to view progress bar on Jupyter Notebook to see progress