[PYTHON] PySpark learning record ② Kaggle I tried the Titanic competition with PySpark binding


As an implementation practice of data processing and machine learning in PySpark, I tried the Kaggle Titanic competition with PySpark binding. I will write while comparing with the theme of "What do you do with PySpark, this process that you often do with Pandas and Scikit-learn?"

Execution environment

I pulled the image of Jupyter/pyspark-notebook: latest with Docker and used it as it is (Python 3.8.6, Spark 3.0.1). For details, see Above.

Data reading ~ EDA

I'm sorry.

#Session start
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("titanic").getOrCreate()

Data reading (csv)

#read csv data
train = spark.read.csv('./data/train.csv', header=True, inferSchema=True)
test = spark.read.csv('./data/test.csv', header=True, inferSchema=True)

Data export (parquet)

#Export DataFrame to Parquet format
#Data reload
train = spark.read.parquet('./data/train_parquet', header=True, inferSchema=True)
test = spark.read.parquet('./data/test_parquet', header=True, inferSchema=True)

Check shape (number of rows, number of columns)

There is no equivalent to .shape in Pandas. Get the number of lines with .count (). The number of columns can be obtained as a list with .columns, so count it withlen ().

#Confirm shape
train_shape = (train.count(), len(train.columns))
test_shape = (test.count(), len(test.columns))

train: (891, 12)
test: (418, 11)

Check the schema

Check the data type of each column. The data types are handled by Scala, not Python.

#Check the schema

 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

Display several data and check the contents

The equivalent of .head () in Pandas, which displays a few lines of data to get an overview. In the case of PySpark, there is a method of displaying the DataFrame itself with .show () or a method of displaying it in Row object units with head (). I think the latter is easier to see when there are many columns or when long text data is included.

#Display 5 data

|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
only showing top 5 rows
#Display 5 data

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C'),
 Row(PassengerId=3, Survived=1, Pclass=3, Name='Heikkinen, Miss. Laina', Sex='female', Age=26.0, SibSp=0, Parch=0, Ticket='STON/O2. 3101282', Fare=7.925, Cabin=None, Embarked='S'),
 Row(PassengerId=4, Survived=1, Pclass=1, Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Sex='female', Age=35.0, SibSp=1, Parch=0, Ticket='113803', Fare=53.1, Cabin='C123', Embarked='S'),
 Row(PassengerId=5, Survived=0, Pclass=3, Name='Allen, Mr. William Henry', Sex='male', Age=35.0, SibSp=0, Parch=0, Ticket='373450', Fare=8.05, Cabin=None, Embarked='S')]

List of statistics

I often do it in Pandas .describe (). The same is true for PySpark, but you need .show () to display it. By default, Pandas only displays numeric variables, but PySpark also displays categorical variables.

#Display statistic list

|summary|       PassengerId|           Survived|            Pclass|                Name|   Sex|              Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
|  count|               889|                889|               889|                 889|   889|              712|               889|                889|               889|              889|  202|     889|
|   mean|             446.0|0.38245219347581555|2.3115860517435323|                null|  null|29.64209269662921|0.5241844769403825|0.38245219347581555| 260763.9104704097|32.09668087739029| null|    null|
| stddev|256.99817277718313|0.48625968831477334|0.8346997785705753|                null|  null|14.49293290032352| 1.103704875596923| 0.8067607445174785|472255.95121695305|49.69750431670795| null|    null|
|    min|                 1|                  0|                 1|"Andersson, Mr. A...|female|             0.42|                 0|                  0|            110152|              0.0|  A10|       C|
|    max|               891|                  1|                 3|van Melkebeke, Mr...|  male|             80.0|                 8|                  6|         WE/P 5735|         512.3292|    T|       S|

Confirmation of missing values

I think it's quick to check with the count line of .describe (). You can also filter each column with .isNull () or isNotNull () to check.

#Missing value confirmation
# train.describe().head()But similar results

Row(summary='count', PassengerId='891', Survived='891', Pclass='891', Name='891', Sex='891', Age='714', SibSp='891', Parch='891', Ticket='891', Fare='891', Cabin='204', Embarked='889')
# 'Age'Get the number of missing values ​​in a column

# 'Age'Get the number of non-missing values ​​in a column


Separate columns for categorical variables and numeric variables

Use .dtypes because you can get a list of tuples containing (column name, data type) for each column.


[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

This time, we decided to treat the data type'double'as a numeric variable and the data type'string' or'int' as a categorical variable, and divided them as follows in list comprehension notation.

#Separate columns for categorical variables and columns for numeric variables
num_cols = [dtype[0] for dtype in train.dtypes if dtype[1] == 'double']
cat_cols = [dtype[0] for dtype in train.dtypes if dtype[1] != 'double']

#Of the objective variable'Survived'Just remove

print('Categorical variables:{}'.format(cat_cols))
print('Numeric variable:{}'.format(num_cols))

Categorical variables:['PassengerId', 'Pclass', 'Name', 'Sex', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked']
Numeric variable:['Age', 'Fare']

Find out the unique number of each column

The equivalent of .nunique () in Pandas. Maybe it will be used as a reference when selecting features or converting categorical variables. Since it is not implemented in PySpark, if you want to do it, turn it with a for statement as follows. (It seems that distributed processing and for statements are not very compatible ...)

#Find out the unique number of categorical variables
nunique = {}

for col in cat_cols:
    value = train.select(col).distinct().count()
    nunique[col] = value


{'PassengerId': 891,
 'Pclass': 3,
 'Name': 891,
 'Sex': 2,
 'SibSp': 7,
 'Parch': 7,
 'Ticket': 681,
 'Cabin': 148,
 'Embarked': 4}

Examine the distribution of categorical variables

For example, when you want to check if the data is imbalanced in the case of a classification task.

# 'Sex'Check the ratio of

|   Sex|count|
|female|  314|
|  male|  577|

Create a pivot table

Implement using .groupBy () and .pivot (). In this example, we can see that the rate of life and death varies greatly depending on gender (although it is extremely famous).

#Make a pivot table

|       1|   233| 109|
|       0|    81| 468|

As shown below, you can narrow down the elements to be displayed and display the statistics of numerical variables.

# 'Survived', 'Embarked'Every'Age'Find out the average value of
# 'Embarked'about'C','Q','S'Limited to
train.groupBy('Survived').pivot('Embarked', ['C', 'Q', 'S']).mean('Age').show()

|Survived|                 C|     Q|                 S|
|       1| 28.97367088607595|  22.5| 28.11318407960199|
|       0|33.666666666666664|30.325|30.203966005665723|

Examine the correlation coefficient (correlation matrix)

Use the .corr () method for the DataFrame. You can also get the correlation matrix by using Correlation in Spark ML's stat module.

# 'Age'When'Fare'Correlation coefficient of
train.corr('Age', 'Fare')



Divided into training data and evaluation data

The equivalent of train_test_split in Scikit-learn. In the case of PySpark, it is implemented as a method of DataFrame.

#train data 7:Divided into 3
df_train, df_valid = train.randomSplit([0.7,0.3], seed=2020)

print('df_train: {} rows'.format(df_train.count()))
print('df_valid:  {} rows'.format(df_valid.count()))

df_train: 651 rows
df_valid: 240 rows

Delete unnecessary columns

In the case of PySpark, it is not always necessary to delete unnecessary columns here because there is work to combine the columns to be used again into one column before plunging into the model, but it can be done as follows.

#Drop unnecessary columns
df_train = df_train.drop('PassengerId', 'Name', 'Cabin', 'Ticket')
df_valid = df_valid.drop('PassengerId', 'Name', 'Cabin', 'Ticket')

['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']

Missing value processing (deletion / completion)

You can delete a row containing a missing value with dropna (). There are three arguments, and how selects'any'or'all' (default'any'). In thresh, specify how many defects are deleted when how ='any' (int, default'None'). subset selects the target column ('None' by default).

# 'Embarked' =Delete null data
df_train = df_train.dropna(how='any', subset=['Embarked'])
df_valid = df_valid.dropna(how='any', subset=['Embarked'])


|summary|           Survived|            Pclass|   Sex|              Age|             SibSp|              Parch|             Fare|Embarked|
|  count|                889|               889|   889|              712|               889|                889|              889|     889|
|   mean|0.38245219347581555|2.3115860517435323|  null|29.64209269662921|0.5241844769403825|0.38245219347581555|32.09668087739029|    null|
| stddev|0.48625968831477334|0.8346997785705753|  null|14.49293290032352| 1.103704875596923| 0.8067607445174785|49.69750431670795|    null|
|    min|                  0|                 1|female|             0.42|                 0|                  0|              0.0|       C|
|    max|                  1|                 3|  male|             80.0|                 8|                  6|         512.3292|       S|

If you want to complement with the mean or median, you can use Spark ML's Imputer. You can see that a new column called'Age_imputed'has been added on the far right.

# 'Age'Complement columns with median
from pyspark.ml.feature import Imputer

imputer = Imputer(

model = imputer.fit(df_train)


|summary|           Survived|            Pclass|   Sex|               Age|             SibSp|             Parch|              Fare|Embarked|       Age_imputed|
|  count|                649|               649|   649|               521|               649|               649|               649|     649|               649|
|   mean|  0.386748844375963| 2.295839753466872|  null|29.335259117082533|0.5469953775038521|0.3913713405238829|31.948169645608576|    null|29.071910631741137|
| stddev|0.48738094472424587|0.8418076223501735|  null| 14.67636802626401|   1.1130653931477|0.7940671982196961|  46.4778648584037|    null| 13.15793243066804|
|    min|                  0|                 1|female|              0.42|                 0|                 0|               0.0|       C|              0.42|
|    max|                  1|                 3|  male|              80.0|                 8|                 6|          512.3292|       S|              80.0|

Standardization of numeric variables

Like Scikit-learn, Spark ML also implements StandardScaler. However, in the case of Spark ML, only one column can be handled, so if you want to standardize multiple columns at the same time, you need to first group the columns you want to process with VectorAssembler into one column and then standardize.

# 'Age_imputed'When'Fare'をまWhenめた'num_cols'Create column
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols = ['Age_imputed', 'Fare'],
df_train = assembler.transform(df_train)
df_valid = assembler.transform(df_valid)
from pyspark.ml.feature import StandardScaler

sc = StandardScaler(inputCol='num_cols', outputCol='num_cols_scaled')
sc_model = sc.fit(df_train)
df_train = sc_model.transform(df_train)
df_valid = sc_model.transform(df_valid)

df_train.select(['Age_imputed', 'Fare', 'num_cols', 'num_cols_scaled']).show(5)

|Age_imputed|   Fare|      num_cols|     num_cols_scaled|
|       22.0|   7.25|   [22.0,7.25]|[1.67199521018387...|
|       38.0|71.2833|[38.0,71.2833]|[2.88799172668123...|
|       26.0|  7.925|  [26.0,7.925]|[1.97599433930821...|
|       35.0|   53.1|   [35.0,53.1]|[2.65999237983797...|
|       35.0|   8.05|   [35.0,8.05]|[2.65999237983797...|
only showing top 5 rows

One-Hot encoding of categorical variables

One-Hot encoding is also implemented in Spark ML. Unlike StandardScaler, it can process multiple columns at once, but it can only handle numbers. Therefore, the column containing the character string needs to be processed by OneHotEncoder after assigning a numerical value with StringIndexser (that is, after Label encoding).

# 'Sex', 'Embarked'To a number
from pyspark.ml.feature import StringIndexer

idx_input = ['Sex', 'Embarked']
idx_output = [col + '_indexed' for col in idx_input]

indexer = StringIndexer(
idx_model = indexer.fit(df_train)
df_train = idx_model.transform(df_train)
df_valid = idx_model.transform(df_valid)

df_train.select(['Sex', 'Sex_indexed', 'Embarked', 'Embarked_indexed']).show(5)

|   Sex|Sex_indexed|Embarked|Embarked_indexed|
|  male|        0.0|       S|             0.0|
|female|        1.0|       C|             1.0|
|female|        1.0|       S|             0.0|
|female|        1.0|       S|             0.0|
|  male|        0.0|       S|             0.0|
only showing top 5 rows
# One-Hot encoding
from pyspark.ml.feature import OneHotEncoder

ohe_input = ['Pclass', 'Sex_indexed', 'SibSp', 'Parch', 'Embarked_indexed']
ohe_output = [col + '_encoded' for col in ohe_input]

ohe = OneHotEncoder(
ohe_model = ohe.fit(df_train)
df_train = ohe_model.transform(df_train)
df_valid = ohe_model.transform(df_valid)

# 'Embarked'confirm
df_train['Embarked', 'Embarked_indexed', 'Embarked_indexed_encoded'].show(10)

|       S|             0.0|           (2,[0],[1.0])|
|       C|             1.0|           (2,[1],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       Q|             2.0|               (2,[],[])|
|       S|             0.0|           (2,[0],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|
|       C|             1.0|           (2,[1],[1.0])|
|       S|             0.0|           (2,[0],[1.0])|

In this way, it becomes Sparse data after One-Hot encoding.

Summarize the columns used for learning

One of the differences from Scikit-learn is that PySpark requires the features used to train the model to be grouped into a single column. It seems that it is common to use VectorAssembler, which was also used in standardization, and name the column'features'.

#Summarize features
assembler2 = VectorAssembler(

df_train = assembler2.transform(df_train)
df_valid = assembler2.transform(df_valid)

df_train.select(['Survived', 'features']).show(5)

|Survived|            features|
|       0|(22,[3,5,12,18,20...|
|       1|(22,[1,5,12,19,20...|
|       1|(22,[4,12,18,20,2...|
|       1|(22,[1,5,12,18,20...|
|       0|(22,[3,4,12,18,20...|
only showing top 5 rows

By the way, this time the categorical variable was originally Sparse data and the numerical variable was Dense data (ordinary data), but it seems that these are automatically converted to Spaese data.


#Check data structure
[Row(features=SparseVector(22, {3: 1.0, 5: 1.0, 12: 1.0, 18: 1.0, 20: 1.672, 21: 0.156})),
 Row(features=SparseVector(22, {1: 1.0, 5: 1.0, 12: 1.0, 19: 1.0, 20: 2.888, 21: 1.5337}))]

Model learning + evaluation

Logistic regression

This time we will simply use logistic regression. Most of the other basic things, such as decision trees and SVC, are implemented in Spark ML.

#Logistic regression
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='Survived')
lr_model = lr.fit(df_train)

For those with correct labels, you can use .evaluation () to get the resulting Summary object for the model, and .predictions to check the contents.

#Acquisition of inference results
train_result = lr_model.evaluate(df_train)
valid_result = lr_model.evaluate(df_valid)

valid_result.predictions.select(['Survived', 'rawPrediction', 'probability', 'prediction']).show()

|Survived|       rawPrediction|         probability|prediction|
|       1|[-0.0107443246616...|[0.49731394467449...|       1.0|
|       0|[2.10818940159344...|[0.89169660088758...|       0.0|
|       0|[2.71630875457920...|[0.93798215479564...|       0.0|
|       1|[-0.2429596953280...|[0.43955710975950...|       1.0|
|       0|[1.81502375560081...|[0.85996794511927...|       0.0|
only showing top 5 rows

Calculation of AUC score

Use the class in the evaluation module for model evaluation. Use BinaryClassificationEvaluator to calculate the AUC score in binary classification like this time.

#Model Evaluation (AUC)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluater = BinaryClassificationEvaluator(
    metricName='areaUnderROC'  #Defaults to AUC score

auc_train = evaluater.evaluate(train_result.predictions)
auc_valid = evaluater.evaluate(valid_result.predictions)

print('AUC score')
print('train:', auc_train)
print('valid:', auc_valid)

AUC score
train: 0.7889497287232977
valid: 0.8065704293474217

If you want to use'f1',' recall',' precision', etc., you need to use MulticlassClassificationEvaluator.

#Model evaluation (F1)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluater = MulticlassClassificationEvaluator(

f1_train = evaluater.evaluate(train_result.predictions)
f1_valid = evaluater.evaluate(valid_result.predictions)

print('f1 score')
print('train:', f1_train)
print('valid:', f1_valid)

f1 score
train: 0.8032854491383006
valid: 0.8270897150503879

Inference of test data (unknown data)

Missing value completion (mode)

Since it is not preferable to delete missing data for unknown data,'Embarked' is filled with the mode'S' here.

# 'Embarked' =Complement null data with mode
train = train.fillna({'Embarked': 'S'})
test = test.fillna({'Embarked': 'S'})

Organize processing in a pipeline

The process used up to model evaluation is summarized in Pipeline. Basically, it seems to be the same as Pipeline of Scikit-learn.

#Pipeline settings
from pyspark.ml import Pipeline, PipelineModel

    imputer,     #'Age'Missing value completion of
    assembler,   #'Age'When'Fare'Into one vector
    sc,          #'Age'When'Fare'Standardized
    indexer,     #'Sex'When'Embarked'Quantify
    ohe,         #OneHotEncodeing
    assembler2,  #Summarize the features to be used
    lr           #Logistic regression

pipeline = Pipeline(stages=stages)
#Model learning
pipeline_model = pipeline.fit(train)

#Save model
model_path = './model/lr_base'
#Model reload
pipeline_model = PipelineModel.load(model_path)

train_result = pipeline_model.transform(train)
test_result = pipeline_model.transform(test)
#Display of forecast results
submission = test_result.withColumn('Survived', test_result['prediction'].cast('int'))
submission = submission.select('PassengerId', 'Survived')

|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
|        896|       1|
|        897|       0|
|        898|       1|
|        899|       0|
|        900|       1|
|        901|       0|
|        902|       0|
|        903|       0|
|        904|       1|
|        905|       0|
|        906|       1|
|        907|       1|
|        908|       0|
|        909|       0|
|        910|       1|
|        911|       1|
only showing top 20 rows

in conclusion

Using the Titanic competition as a theme, I implemented a basic machine learning method in PySpark. Spark ML seems to be strongly influenced by Scikit-learn, and there are many similarities, but for some reason standardization can only be done column by column (if used as it is), or Label encoding is used before One-Hot. I felt that there was a slight habit of having to do it. In the future, I would like to use not only Spark ML but also Spark external packages.


https://data-analysis-stats.jp/spark/pyspark%E3%81%A7%E6%AC%A0%E6%90%8D%E5%80%A4null%E3%81%AE%E5%8F%96%E3%82%8A%E6%89%B1%E3%81%84%E6%96%B9%E6%B3%95/ https://qiita.com/calderarie/items/d37462d7eafef04891b8 https://qiita.com/gsy0911/items/a4cb8b2d54d6341558e0#window%E9%96%A2%E6%95%B0 PySpark official documentation

Recommended Posts

PySpark learning record ② Kaggle I tried the Titanic competition with PySpark binding
I tried learning with Kaggle's Titanic (kaggle②)
I tried to predict and submit Titanic survivors with Kaggle
I tried machine learning with liblinear
I tried learning LightGBM with Yellowbrick
Challenges for the Titanic Competition for Kaggle Beginners
I tried factor analysis with Titanic data!
[Kaggle] I tried ensemble learning using LightGBM
Check the correlation with Kaggle's Titanic (kaggle③)
I tried playing with the image with Pillow
I tried to visualize the model with the low-code machine learning library "PyCaret"
I tried "smoothing" the image with Python + OpenCV
I tried "differentiating" the image with Python + OpenCV
I tried to save the data with discord
I tried principal component analysis with Titanic data!
I tried "binarizing" the image with Python + OpenCV
I tried to predict Titanic survival with PyCaret
I tried playing with the calculator on tkinter
[Mac] I tried reinforcement learning with OpenAI Baselines
I tried to learn the sin function with chainer
I tried to move machine learning (ObjectDetection) with TouchDesigner
I tried to touch the CSV file with Python
I tried to solve the soma cube with python
I tried deep learning
Mayungo's Python Learning Episode 1: I tried printing with print
Record of the first machine learning challenge with Keras
I tried to solve the problem with Python Vol.1
I tried hitting the API with echonest's python client
I tried to compress the image using machine learning
I tried the same data analysis with kaggle notebook (python) and Power BI at the same time ②
I tried to compare the accuracy of machine learning models using kaggle as a theme.
I tried the same data analysis with kaggle notebook (python) and Power BI at the same time ①
I tried to find the entropy of the image with python
I tried "gamma correction" of the image with Python + OpenCV
I tried to simulate how the infection spreads with Python
I tried to analyze the whole novel "Weathering with You" ☔️
I tried using the Python library from Ruby with PyCall
I tried to find the average of the sequence with TensorFlow
I tried to notify the train delay information with LINE Notify
I tried replacing the Windows 10 HDD with a smaller SSD
I tried using the DS18B20 temperature sensor with Raspberry Pi
Mayungo's Python Learning Episode 3: I tried to print numbers with print
I tried saving the DRF API request history with django-request
I tried to implement ListNet of rank learning with Chainer
[TF] I tried to visualize the learning result using Tensorboard
[Machine learning] I tried to summarize the theory of Adaboost
I captured the Touhou Project with Deep Learning ... I wanted to.
I tried to divide the file into folders with Python
I tried to divide with a deep learning language model
I tried fp-growth with python
I tried scraping with Python
Learning record 13 (17th day) Kaggle3
I tried Learning-to-Rank with Elasticsearch!
Learning record 12 (16th day) Kaggle2
I tried clustering with PyCaret
I tried the changefinder library!
I tried gRPC with Python
I tried scraping with python
Try machine learning with Kaggle
I tried to make Othello AI with tensorflow without understanding the theory of machine learning ~ Introduction ~
I tried to make Othello AI with tensorflow without understanding the theory of machine learning ~ Implementation ~