[PYTHON] Performance verification of data preprocessing for machine learning (numerical data) (Part 2)

First edition: 2020/3/18
Authors: Soichi Takashige, Masahiro Ito, Hitachi, Ltd.

Introduction

In this post, we will introduce the design know-how of data preprocessing and the performance verification result of data preprocessing when designing a system incorporating a machine learning model.

In the third installment, we will introduce the performance improvement know-how and verification results in data preprocessing using Spark, which is a parallel distributed processing platform.

** Post list: **

  1. About data preprocessing of the system using machine learning
  2. Performance verification of data preprocessing for machine learning (numerical data) (Part 1)
  3. Performance verification of data preprocessing for machine learning (numerical data) (Part 2) (Posted)

Utilization of Spark for data preprocessing

In my previous post, I showed that if you have a large amount of data and you want to preprocess data with Python using a single node, you will run out of memory. In such cases, it is often effective to use a parallel distributed processing platform. This time, we will introduce data preprocessing by Spark, which is a typical parallel and distributed processing platform.

Rewriting processing according to the data processing method

In the development of a system that utilizes machine learning, as described in the first post, the PoC for confirming the effectiveness of machine learning is first performed, and then the production system is developed based on the result. However, at this PoC stage, data preprocessing is often implemented in Python. Therefore, if you want to use Spark when developing a production system, you need to rewrite your Python code for Spark. This time, the data preprocessing by Pandas data frame for BigBench business scenario # 5 was rewritten to Spark processing according to the following guidelines.

import numpy as np
import pandas as pd


```python:After(spark)
import pyspark
import pyspark.sql
form pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
spark = SparkSession.builder.master(…) #Described the connection settings with Spark
sc = spark.getContext()
hive_context = HiveContext(sc)

df = pd.DataFrame()


```python:After(spark)
df = spark.createDataFrame(data, schema)

df = pd.read_csv(filename)


```python:After(spark)
df = spark.read.csv(filename)

df.loc[df[“columnname”]notnull(),:]


```python:After(spark)
df.filter(df[“columnname”].isNotNull())

df.loc[rownumber]


```python:After(spark)
#Modifying the logic so that access to a specific row is time consuming and does not require row access

The way you access a particular string of data is the same for pandas and spark.

df[‘columnname’]

df.loc[df[‘columnname’] == value]


```python:After(spark)
df.filter(df[‘columnname’] == value)

pd.merge(df1, df2, how=’inner’, left_on=”l-key” right_on=”r-key”)


```python:After(spark)
df1.join(df2, df1.l_key == df2.r_key, ‘inner’)

def function(df): #Do something return data df.groupby(“key”).apply(function)


```python:After(spark)
out_schema = StructType(...) #out_schema is the definition of the data schema output by the function
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP) 
def function(df):
 #Do something * Depending on the Pandas UDF specifications, it may be necessary to rewrite (described later)
 return data
df.groupby(“key”).apply(function)

Rewriting processing with Pandas UDF

Pandas UDF is one of the ways to execute Python processing using Pandas in Spark. Pandas UDF is a mechanism of cooperation between Python and Spark provided by Spark, and it is possible to perform parallel and distributed processing of processing implemented in Python (Pandas) in Spark. The three functions shown in Table 1 are available in Pandas UDF and can be called from Spark dataframe operations.

Table 1 Types of Python functions that can be used with Pandas UDF

# Processing function type Explanation of the processing that can be performed by the function Use scene
1 SCALAR Onepandas.Seriesを受け取り、処理を行ったうえで、Onepandas.Seriesreturn it. The element name and type must match on the input and output. Applies to the next iteration for a data string
  • map
  • apply_along_axis
2 AGG One or morepandas.SeriesReceive,After some processing, onepandas.Seriesreturn it. The number of input and output data does not have to match. Applies to aggregation processing
  • agg
3 GROUPED_MAP Onepandas.DataFrameIs received, some processing is performed on each element, and as a result, anotherpandas.DataFrameReturns one. Input and output data rows, column names, etc. do not have to match Applies to each dataset in the grouped dataset
  • groupby.apply

When writing groupby.apply processing in Pandas, by registering the Python function passed to that ʻapply as the above GROUPED_MAP function, parallel execution by Spark without man-hours such as reimplementation Can be realized. However, the Pandas and Spark ʻapply functions have slightly different specifications and may require some tweaking. In this implementation, it is necessary to deal with the difference between the following two specifications, so we have rewritten it. In the explanation, df is the variable name passed to the data frame, keyname is the column name of the data frame, and function is the function name that defines the iterative process.

  1. ** Difference in argument values **
Difference in behavior When you execute
`df.groupby (“keyname”) .apply (function) `, the data is first grouped by the same value of` keyname`, and then grouped into one data frame. .. In both Pandas and Spark, the `DataFrame` object in each library is passed to the argument of the function` function (df) `specified by the ʻapply` function. In Pandas, this `DataFrame` has a property called" `df.name`" that allows you to get the value of `keyname` in a group. Spark, on the other hand, doesn't have such a property.
Countermeasures The dataframe passed to the
function `function (df)` also contains the `keyname` element in principle. You can get the value of `name` by setting` name = df [“keyname”] [0] `.
  1. ** Difference in handling of return value **
Difference in behavior In
Pandas, `df.groupby (“keyname”). Apply (function) `will be interpreted differently depending on the column name and data type included in the return value of the result. In particular, the `groupby` function automatically creates data with the` keyname` column added as an index to the output data, and a `Series` or` DataFrame` is created by aggregating them. On the other hand, Spark does not perform such automatic column completion.
Countermeasures The return value of
`function (df)` must always have `keyname` as the column name and be configured to include the` keyname` value of the original data.

BigBench Business Scenario # 5 Spark Code Example

As a result of rewriting the implementation of data preprocessing of BigBench business scenario # 5 by Python shown in the second post to Spark using the know-how mentioned so far, the final result is as shown in Figure 1 below. It became a code.

import pandas as pd
import numpy as np

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType, when
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)

web_clickstreams = hive_context.read.table("bigbench.web_clickstreams")
item = hive_context.read.table("bigbench.item")
customer = hive_context.read.table("bigbench.customer")
customer_demographics = hive_context.read.table("bigbench.customer_demographics")

#Processing ①
data = web_clickstreams.filter(web_clickstreams[“wcs_user_sk”].isNotNull())
data = data.join(item, data["wcs_item_sk"] == item["i_item_sk"], 'inner')

#Process (2): Group by user ID
grouped_users = data.groupby('wcs_user_sk')

#Process ③ Type definition: Define the output data type of iterative processing
types =  ["wcs_user_sk", "clicks_in_category"]+["clicks_in_%d"%i for i in range(1,8)]
out_schema = StructType([StructField(i, IntegerType(), True) for i in  types])

#Process ③ Registration: Register the contents of the iterative process as a function in Pandas UDF
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def summarize_per_user(wcs_user_sk_contents):
    wcs_user_sk_index = wcs_user_sk_contents['wcs_user_sk'][0]
    #Processing ③-1, ③-2
    clicks_in_category = \
        len(wcs_user_sk_contents[wcs_user_sk_contents['i_category'] == i_category_index])
    clicks_in = [0] * 8
    for name, df in wcs_user_sk_contents.groupby('i_category_id'):#Optimized loop once
        if name < len(clicks_in):
            clicks_in[name] = len(df.index)
    #Processing ③-3
    return pd.DataFrame([wcs_user_sk_index, clicks_in_category] + clicks_in[1:],\
                        columns=types)

#Process ③ Execution
i_category_index = 'Books'
data = grouped_users.apply(summarize_per_user)

#Processing ④
data = data.join(customer, data["wcs_user_sk"] == customer["c_customer_sk"], 'inner')

#Processing ⑤
data = data.join(customer_demographics, \
     data["c_current_cdemo_sk"] == customer_demographics["cd_demo_sk"], 'inner')

#Processing ⑥
data.withColumn('college_education', 
                      when(data["cd_education_status"] == 'Advanced Degree', 1)\
                     .when(data["cd_education_status"] == 'College', 1)\
                     .when(data[“cd_education_status”] == '4 yr Degree', 1)\
                     .when(data[“cd_education_status”] == '2 yr Degree', 1)\
                     .otherwise(0))
data.withColumn('male', when(data[“cd_gender”] == 'M', 1).otherwise(0))

#Save results
data.write.mode('append').parquet('answer_q05_0100.parquet')

Figure 1 Data preprocessing source code for BigBench scenario # 5 by Spark

Effect verification using Spark

From here, we will verify the performance of data preprocessing by Spark implemented in Fig. 1. Figure 2 below shows the service layout of the Spark cluster built as an environment for parallel and distributed processing in this verification. This time, we are building a Spark cluster using the Cloudera distribution, assuming an on-premises use case. In addition, one node of Worker Node is used for the verification of single node processing by Python, which was done in the second post. Also, when processing in Spark, the actual parallel and distributed processing is performed on 3 Worker Nodes.

spark

Figure 2 Service layout of this verification environment

Next, Table 2 shows the specifications of this verification environment. This time, I am using IaaS (EC2 instance) on AWS as a verification machine. Five 1TB HDDs (EBS) are connected to the EC2 instance for Worker Node, and 5TB capacity is connected per node. However, since HDFS writes data in 3 multiplexes, the effective data capacity is about 5TB.

Table 2 Hardware specifications of verification environment

Manager Node Master Node Worker Node×3
Verification environment AWS EC2 AWS EC2 AWS EC2
OS CentOS 7 64bit CentOS 7 64bit CentOS 7 64bit
CPU(Number of cores) 2 4 96 (32 x 3 nodes)
Memory(GB) 16 32 768 (256GB x 3 nodes)
HDD(GB) 80GB 80GB 15TB※(1TB x 5 HDD x 3 nodes)

The software versions used for verification are shown in Table 3 below.

Table 3 Software version of verification environment

software version
Cloudera distribution CDH 6.3.0
Spark 2.4.0
Hive 2.1.1
YARN 2.5.0
HDFS 3.0.0
Python 3.7.3
Pandas 0.24.2
Numpy 1.16.4

Processing method to compare

In addition to the performance measurement results for the processing methods 1 and 2 below, which were verified in the second post last time, we will measure and compare the performance (3.) when parallel distributed processing is performed by Spark. ..

  1. Single-node processing with Python (without logic optimization in Figure 5 in Part 2)

Run the code in Figure 4 from the second post on Python.

  1. Single-node processing with Python (with logic optimization in Figure 5 of the second post)

Execute the code optimized in Figure 5 of the second post against the code in Figure 4 of the second post on Python.

  1. Parallel and distributed processing with Spark (with logic optimization in Figure 5 of the second post)

Execute parallel and distributed processing on Spark using the logic-optimized Python single-node processing code used in 2. and modified code for Spark processing (shown in Figure 1). ..

Execution parameters during processing in Spark

Spark execution parameters for executing tasks are set as shown in Table 4. Each Worker Node starts one worker process (Executor) and allocates it so that memory and core can be used exclusively in it.

Table 4 Spark execution parameters

# item Set value Remarks
1 Number of Executors 3 Assuming that each node is started one by one
2 Executor memory size 128 GB
3 Number of cores per executor 30 Allocate and use 30 cores out of 32 cores of the machine

Processing content and data to be measured

In the measurement, the total time required for the following three processes will be measured in the same way as the measurement for the second post.

  1. Reading data from a data source into memory

  2. Preprocessing such as data combination and aggregation for the read data

  3. Write the processing result to the data store

In addition, the data to be measured was set to the same settings as in Part 2 (Table 3 in the second post).

Performance measurement results

Figure 3 shows the results of evaluating the processing time by executing each of the four types of processing for each data size for BigBench's business scenario # 5.

performance

Figure 3 Data preprocessing time measurement results for each input data size

In the single node processing by Python, as the result in the second post, when the input data size is increased, at the stage of the data set smaller than the actual data set size (about 50 GB) Processing will not be possible due to lack of memory. On the other hand, when parallel and distributed processing was performed with Spark, the processing was completed normally even with the data set assumed for production, and the processing could be completed even with a larger size.

Figure 4 shows the progress of CPU, memory, and disk I / O usage when processing 22GB of data with Spark. It can be confirmed that the calculation time was greatly reduced because all the CPUs of each worker node were used 100%. Memory is also distributed to each node. Since each of the three nodes has the overhead of the underlying OS function, the memory usage of the entire cluster is larger than that of a single machine, and the memory used after starting the program of business scenario # 5 is per worker node. It is about 130-140GB, and the total of 3 nodes is about 410GB. In addition, as a result of processing (1) Inner join and processing (2) group by processing (see Fig. 3 posted in Part 2), you can see how disk write I / O is occurring. This I / O occurs because the combined or sorted data is stored on disk.

resource

Figure 4 Temporal changes in CPU, memory, and I / O usage in a Spark environment

Consideration of performance evaluation results

Effect of introducing Spark

[Effect of parallel decentralization (1): Support for large-scale data]

With the introduction of Spark, it has become possible to perform preprocessing on large-scale data that will be forcibly terminated due to insufficient memory when executed on Python on a single server. When dealing with data that exceeds the memory capacity limit of one machine, the introduction of Spark is considered appropriate.

[Effect of parallel decentralization (2): Reduction of processing time]

Python cannot take advantage of multiple CPUs, so even if the data size increases, processing is executed sequentially on a single CPU. On the other hand, in Spark, the data is divided for each node, and the processing for each data is assigned to different nodes and CPU cores for parallel processing, so the processing time can be greatly reduced.

About the features of Python and Spark

  • When using Pandas on Python, all the data to be processed is read into memory and processed sequentially by a single CPU. Also, in the example of business scenario # 5 this time, the repetition time increases purely in proportion to the data size.

  • In the processing with Spark, data is read into memory and written to disk for each partition divided by a certain size. Also, when any process that requires data exchange between the divided nodes (JOIN, GROUP BY, sorting, etc.) is performed, the result is written to disk. By processing large data for each partition in this way, even if the data size to be handled exceeds the amount of physical memory installed, the processing can be completed, but the processing time becomes a disk IO bottleneck and sudden processing Time can increase more than linear.

  • In Fig. 3, when the input data size is about 150GB, the processing time suddenly increases non-linearly, but this is because the data size to be handled exceeds the installed memory amount, disk IO occurs and it becomes a bottleneck. I am.

Summary

We introduced the results of performance evaluation and the know-how for improving performance at that time, targeting sample work that performs preprocessing such as data binding and aggregation for large-scale table data.

If the data to be processed is not so large (up to several tens of GB), the preprocessing with Python on a single node can be completed in a few hours, so the data preprocessing is performed with the preprocessing code in Python. Is possible. At that time, it is possible to shorten the processing time by implementing the logic optimization introduced in this series.

On the other hand, if the amount of data to be processed is large, Python may take too long to process or the process may fail, so applying a distributed processing platform such as Spark is a realistic option. When the data to be learned used in machine learning is about 22GB, the processing time can be shortened by about 94% compared to Python that operates on a single node (single thread) by using parallel distributed processing by Spark for preprocessing. I confirmed. We also confirmed that when using Spark with a Worker 3 node, Python can process data up to about 4 times the amount of data that can be processed with a single node while maintaining performance.

Recommended Posts