I used to use Spark before, but now I don't use it I will forget it soon, so I decided to make a note of the basics.
(Since it is a knowledge of listening to the whole, I expect comments and edit requests for mistakes)
A Docker image that runs a Jupyter + PySpark environment is provided, which is convenient for trying locally: https://hub.docker.com/r/jupyter/pyspark-notebook/
It's a story about PySpark, but Spark itself is Scala, There is a story that there is a guy that can be used with Python and that is PySpark.
It should have been a mechanism to do my best with IPC, so Scala <-> There is also a topic that the cost of converting Python is quite high.
Now let's use:
docker run -it -p 8888:8888 jupyter/pyspark-notebook
When you do this, the URL with the token number 8888 will flow to Terminal, so
(Around To access the notebook, ...
)
When you access it, the Jupyter page will appear,
You have a simple environment that you can code with Jupyter Notebook.
Select Notebook: Python3
from New
here to open the Notebook
The test code to see if it works is taken from the sample below (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
spark.sql('SELECT "Test" as c1').show()
I'm not sure about the Spark Session
, but
It's a recognition that it's like an instance of Spark itself.
If you do this and you get a table, it's OK:
Let's target this kind of data:
id |
name |
gender |
age |
---|---|---|---|
1 | Satoshi | male | 10 |
2 | Shigeru | male | 10 |
3 | Kasumi | female | 12 |
Here's a simple definition of data in Python:
from typing import List, Tuple
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'Satoshi', 'male', 10),
(2, 'Shigeru', 'male', 10),
(3, 'Kasumi', 'female', 12),
]
The type of each line is Tuple [int, str, str, int]
in Python's typing
.
And Spark also has a schema definition:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
Now you can define the column schema on the Spark side.
To convert data defined in Python to Spark's DataFrame
:
from pyspark.sql import DataFrame
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
You now have a DataFrame
called trainers_df
.
Since it can be read from CSV, MySQL, etc. as a data source, In reality, it will be read from such a data source rather than defined in the code. (In some cases, it is necessary to set JDBC or Hadoop, which will be described later.)
If you want to dump this and see it:
trainers_df.show()
This will output a few lines of formatted text in the table:
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|Satoshi| male| 10|
| 2|Shigeru| male| 10|
| 3|Kasumi|female| 12|
+---+------+------+---+
To get the value instead of the dump, just do .collect ()
:
result = trainers_df.collect()
print(result)
When exporting to CSV, export DataFrame
in this way:
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
Like the input, there are various other output destinations such as S3, MySQL and Elasticsearch.
.coalesce (1)
is the data divided for each partition.
It is to coalesce into one partition.
If you do not do this, the CSV will be output as it is divided.
Using Hadoop's hdfs
command
There is also a way to get the divided ones together.
It's basically lazy evaluation
Since it is evaluated only after performing an operation like .collect ()
You shouldn't aggregate that often.
This alone doesn't make any sense just to display it, so let's do something appropriate:
trainers_df.createOrReplaceTempView('trainers');
male_trainers_df = spark.sql('''
SELECT *
FROM trainers
WHERE gender = 'male'
''')
male_trainers_df.show()
This gives this result:
id |
name |
gender |
age |
---|---|---|---|
1 | Satoshi | male | 10 |
2 | Shigeru | male | 10 |
DataFrame.createOrReplaceTempView (name)
is DataFrame
,
It can be registered as a temporary SQL View.
Now you can get the DF of the result of SQL operation for the View registered with spark.sql (query)
.
That way, you can use Spark with the SQL you're used to, without any hesitation.
The magic is that psychological barriers and learning costs are low.
You can also write the code as DataFrame
without registering it in View:
male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
There are cases where this is easier to use, so it's case by case.
Since you can use SQL, there is no problem with basic operations, but Most of the time, the case where you want to use Spark seems to be a situation where you want to perform some user-defined operation.
For example, as a case I wanted to do in the past, There is a thing called "morphological analysis of the article text and dividing it". This is difficult to achieve with SQL alone.
However, since there is MeCab on Python, If you do morphological analysis using the MeCab library, it will be decomposed without thinking, so Even if you don't understand at all like me, you can just throw it in MeCab for the time being.
How can I do that for DataFrame
on Spark?
It is good to define UDF (User-Defined Function).
(* There is a technique that you can apply lambda
directly to RDD instead of DataFrame
,
This has poor performance).
To define a UDF, make the following definition:
from pyspark.sql.functions import udf
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'Kun', 'female': 'Mr.'}.get(gender, 'Mr')
spark.udf.register('name_with_suffix', name_with_suffix)
By applying the @udf (ReturnType)
decorator to the function that becomes the UDF
The function can now be defined as a UDF.
To use it with Spark SQL, register it with spark.udf.register (udf_name, udf)
You can use it as it is for the same purpose as COUNT ()
.
By the way, even if you don't use a decorator, you can apply an existing function by ʻudf_fn = udf (fn) `.
The one given as an example of this depends on gender
.
It is to add a suffix corresponding to gender
to name
.
Let's apply this function as a UDF:
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender)
FROM trainers
''')
dearest_trainers.show()
The result is:
name_with_suffix(name, gender) |
---|
Satoshi-kun |
Shigeru |
Kasumi |
In this example, there is an opinion that you can write using CASE
even in SQL, but that's right.
It can be useful depending on what you want to do.
UDF
By the way, the above-mentioned morphological analysis is performed and divided. This would be a function like this as an image (Actually I use MeCab coolly):
import re
#Half size/Divide the character string by double-byte spaces and punctuation marks
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
return [
word
for word
in re.split('[ !…]+', text)
if len(word) > 0
]
It is OK to apply this as it is. Let's write the sample code again while changing the data:
Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
(1, 'Satoshi', 'male', 10, 'Get Pokemon'),
(2, 'Shigeru', 'male', 10, 'This is the best of all! It means to be strong!'),
(3, 'Kasumi', 'female', 12, 'My policy is ... at least with water-type Pokemon ... at least spree!'),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');
wakachi_trainers_df = spark.sql('''
SELECT id, name, wakachi(comment)
FROM trainers
''')
wakachi_trainers_df.show()
The point here is
This time UDF receives str
and expands it asList [str]
.
When I try this, it looks like this:
id |
name |
wakachi(comment) |
---|---|---|
1 | Satoshi | [Pokémon,get,That's right] |
2 | Shigeru | [This me,In the world,I... |
3 | Kasumi | [My,Policy,Mizu... |
The expanded cells are in a list It is in a nested state with more columns inside the columns.
What if you want to expand this as a column for each str
?
You can apply more functions to expand:
https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column)
from pyspark.sql.functions import explode
wakachi_trainers_df = spark.sql('''
SELECT id, name, explode(wakachi(comment))
FROM trainers
''')
wakachi_trainers_df.show()
Since there is a function called ʻexplode`, Applying this will expand the nested elements as their respective columns:
id |
name |
col |
---|---|---|
1 | Satoshi | Pokémon |
1 | Satoshi | get |
1 | Satoshi | That's right |
2 | Shigeru | This me |
2 | Shigeru | In the world |
2 | Shigeru | most |
2 | Shigeru | Strong |
2 | Shigeru | That's what it is |
3 | Kasumi | My |
3 | Kasumi | Policy |
3 | Kasumi | Mizu |
3 | Kasumi | type |
3 | Kasumi | In Pokemon |
3 | Kasumi | at least |
3 | Kasumi | at least |
3 | Kasumi | Spree |
3 | Kasumi | That |
As a further point, you can create a JOIN
between DataFrame
s.
Specify the column to be used for join, which is the same as JOIN
such as ordinary MySQL.
Based on that, DataFrame
is combined.
Let's add more sample code and use JOIN
:
Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
(1, 1, 'Pikachu', 99),
(2, 1, 'Charizard', 99),
(3, 2, 'Eevee', 50),
(4, 3, 'Goldeen', 20),
(5, 3, 'Staryu', 30),
(6, 3, 'Starmie', 40),
]
pkmns_schema = StructType([
StructField('id', IntegerType(), True),
StructField('trainer_id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('level', IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
spark.sparkContext.parallelize(pkmns),
pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');
trainer_and_pkmns_df = spark.sql('''
SELECT *
FROM trainers
INNER JOIN pkmns
ON trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id |
name |
gender |
age |
comment |
id |
trainer_id |
name |
level |
---|---|---|---|---|---|---|---|---|
1 | Satoshi | male | 10 | Get Pokemon | 1 | 1 | Pikachu | 99 |
1 | Satoshi | male | 10 | Get Pokemon | 2 | 1 | Charizard | 99 |
3 | Kasumi | female | 12 | My policy is ... Mizuta... | 4 | 3 | Goldeen | 20 |
3 | Kasumi | female | 12 | My policy is ... Mizuta... | 5 | 3 | Staryu | 30 |
3 | Kasumi | female | 12 | My policy is ... Mizuta... | 6 | 3 | Starmie | 40 |
2 | Shigeru | male | 10 | I'm the best... | 3 | 2 | Eevee | 50 |
By the way, there are many types other than ʻINNER JOIN and ʻOUTER JOIN
.
This article is easy to understand, so I will quote it:
https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740
It's convenient because you can perform collective operations with this.
The concept of each JOIN
is quoted because the Venn diagram on this page is easy to understand:
https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-dabbf3475690
As a point, JOIN
is still costly and slow.
If you have formed a cluster, it seems that operations such as finding it from the data distributed in various places, JOIN
and returning it are performed.
Therefore, performance tuning, which will be described later, is required.
In the real world, wrestling with huge datasets can be a daunting task. Because if it takes 4 hours or so, if it falls near the end, you will have to start over again. If you make a mistake twice, you will have devoted one day's work hours and overtime will be confirmed.
So, in order to improve such performance, we reduced the data to increase the efficiency of JOIN
,
You can change the partitioning method,
It is necessary to devise so that the partition is not fragmented on the cluster as much as possible.
It is called Broadcast Join, and by daring to place datasets in all clusters in duplicate,
There are also things like lowering the cost of searching datasets at JOIN
.
As an important technique
By setting DataFrame to .cache ()
at each checkpoint,
In some cases, performance is dramatically improved.
If you look at the official page about performance, there is such a technique and it will be helpful:
https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries
MySQL
Well, it's common to want to read from a MySQL database and get rid of it. In this case you need to have a JDBC MySQL connector to work with MySQL, This person's entry and its Docker image will be helpful:
However, MySQL seems to be awkward to handle in Spark. (There are various addictive points)
Spark is powerful:
--The data is huge anyway --The processing you want to apply does not depend on each other --Each operation has no side effects and is completed by internal operations (no operations to external APIs)
I think that is.
Also, Spark is the key to creating clusters with multiple units and letting workers do the work. In reality, it seems that it is better to do it with Amazon EMR or AWS Glue because it is left to AWS. This is because if it is local, it works without creating a cluster, so even if you type in a huge amount of serious data, there will be no performance and you will not benefit from it.
You hit the limit of memory, Even if you can save money, if it is huge data that it takes two weeks to batch flow through the entire process, Even if it is simple, it may be possible to divide it by yourself and divide it into multiple processes and execute it. It's a good idea to leave it to Spark if it can.
Recommended Posts