Introduction to Machine Learning with Spark "Price Estimate" # 3 Make a [Price Estimate Engine] by learning with training data

Overview

-Introduction to machine learning using ** Java ** and ** Apache Spark ** --Hands-on from how to use ** Apache Spark ** to actual machine learning (learning, regression) with a step-up method based on "price estimation" --Use ** Gradient Boosting Tree ** of ** Supervised Learning ** for machine learning algorithms --Post to the step-up method in multiple times --Click here for full source code https://github.com/riversun/spark-gradient-boosting-tree-regression-example

environment

Create a machine learning model

It's finally the machine learning phase. This time, we will create a "price estimation engine" that actually works.

Last time executed the process of replacing the categorical variable represented by the string with a numerical value in the pipeline.

And since the digitized category variable name was processed so that ** Index ** was added to the end (suffix), The Dataset has ** materialIndex, shapeIndex, brandIndex, shopIndex, weight, price ** as numeric variables.

Explanatory variables and objective variables

In this example, we want to estimate (predict) the price of the accessory, but the data to be predicted (** price ** here) is called the ** objective variable **.

When you want to estimate (predict) the price of an accessory, what determines the price of the accessory?

If the accessory is made of diamond, it will be expensive, and the heavier the gem or precious metal, the more valuable it will be. Also, if the product is made by a famous brand, it is likely to have a premium.

As you can see, there are some reasons, that is, ** causes **, above and below the price of accessories.

The data that causes this is called the ** explanatory variable **.

Cause: Explanatory variable Result ... Objective variable

So The variables in the Dataset are divided into the explanatory variables and the objective variables as follows.

Explanatory variables ・ ・ ・ ** materialIndex, shapeIndex, brandIndex, shopIndex, weight ** Objective variable ・ ・ ・ ** price **

image.png

From now on, we will use this data for learning.

Learning is a wide variety of methods by making it possible to discover and express (although not always human-understandable) the relationships and rules between explanatory variables and objective variables using mathematical formulas and ** learning algorithms **. Has been proposed.

The relationship between the explanatory variable and the objective variable acquired by learning is called a ** learning model ** (or machine learning model, or simply a model).

Of course, in order to unravel and approximate unknown relationships, there must be a considerable amount of data, so machine learning is equal to or more than an algorithm ** the quantity and quality of data is important **. It is said.

This time, we will prepare ** gradient boosting tree ** as a learning algorithm, and ** 500 accessory price data (no missing values) ** as data.

Regression

When the objective variable (= price) that you want to predict by machine learning is a continuous value like this time, such prediction is called ** regression ** (regression).

And the learning model for regression is called ** Regressor **.

On the other hand, when the objective variable is binary or when multiple classes (categories) are required, it is called ** classification ** (clasification). The learning model for classification is called ** Classifier **.

Create a feature vector

In order to use it for learning, it processes to combine multiple variables (columns) into one.


import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class GBTRegressionStep03_part01 {

  public static void main(String[] args) {

    System.setProperty("hadoop.home.dir", "c:\\Temp\\winutil\\");// for windows

    org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.ERROR);
    org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.ERROR);

    SparkSession spark = SparkSession
        .builder()
        .appName("GradientBoostingTreeGegression")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> dataset = spark
        .read()
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load("dataset/gem_price_ja.csv");

    List<String> categoricalColNames = Arrays.asList("material", "shape", "brand", "shop");

    List<StringIndexer> stringIndexers = categoricalColNames.stream()
        .map(col -> new StringIndexer()
            .setStringOrderType("frequencyDesc")
            .setInputCol(col)
            .setOutputCol(col + "Index"))
        .collect(Collectors.toList());

    String[] indexedCategoricalColNames = stringIndexers// (1)
        .stream()
        .map(StringIndexer::getOutputCol)
        .toArray(String[]::new);

    String[] numericColNames = new String[] { "weight" };// (2)

    VectorAssembler assembler = new VectorAssembler()// (3)
        .setInputCols(array(indexedCategoricalColNames, numericColNames))
        .setOutputCol("features");

    PipelineStage[] indexerStages = stringIndexers.toArray(new PipelineStage[0]);// (5)

    PipelineStage[] pipelineStages = array(indexerStages, assembler);// (6)

    Pipeline pipeline = new Pipeline().setStages(pipelineStages);// (7)

    pipeline.fit(dataset).transform(dataset).show(10);// (8)

  }

  @SuppressWarnings("unchecked")
  public static <T> T[] array(final T[] array1, final T... array2) {
    final Class<?> type1 = array1.getClass().getComponentType();
    final T[] joinedArray = (T[]) Array.newInstance(type1, array1.length + array2.length);
    System.arraycopy(array1, 0, joinedArray, 0, array1.length);
    System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
    return joinedArray;
  }
}

Code commentary

** (1) ** ・ ・ ・ *** StringIndexer *** Process to make an array of column names that will be the output (outputCol). It will be an array of column names of categorical variables quantified as shown below.

[materialIndex,shapeIndex,brandIndex,shopIndex]

** (2) ** ・ ・ ・ An array of column names that take a numeric variable. This time only ** weight **

** (3) ** ・ ・ ・ Vectorize multiple numeric variables.

*** VectorAssembler # setInputCols *** is the column name you want to vectorize. Here, all column names (** materialIndex, shapeIndex, brandIndex, shopIndex, weight **) that are candidates for explanatory variables are specified [^ 1].

[^ 1]: Depending on the machine learning algorithm, it may not be possible to learn well without understanding the explanatory variables well and selecting the explanatory variables (Feature Selection). For example, high correlation between explanatory variables can lead to the problem of multicollinearity (multicollinearity). This time, the algorithm is a decision tree system (non-linear model) and it is an introductory part, so all the explanatory variable candidates are included. *** array function *** is a function for joining arrays. The vectorized result is added to the Dataset as the column name specified by *** setOutputCol ***.

VectorAssembler assembler = new VectorAssembler()// (3)
    .setInputCols(array(indexedCategoricalColNames, numericColNames))
    .setOutputCol("features");

** (5)-(7) ** ・ ・ ・ Set ** StringIndexer ** and ** VectorAssembler ** in ** Pipeline **.

PipelineStage[] indexerStages = stringIndexers.toArray(new PipelineStage[0]);// (5)
PipelineStage[] pipelineStages = array(indexerStages, assembler);// (6)
Pipeline pipeline = new Pipeline().setStages(pipelineStages);// (7)

** (8) ** ・ ・ ・ Execute this pipeline processing

The execution result is as follows.

image.png

You can see that a vector column called ** features ** has been added.

That is, *** VectorAssembler *** makes multiple variables a vector with a single "** features **" name with ** weight, materialIndex, shapeIndex, brandIndex, shopIndex **.

image.png

This ** features **-like data is called a ** feature vector **.

Build a learning model and predict prices

Well, I'm finally ready Finally, ** build a learning model ** and actually ** predict the price of accessories ** (regression).

Immediately, the learning and prediction code looks like this:


import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class GBTRegressionStep03_part02 {

  public static void main(String[] args) {

    SparkSession spark = SparkSession
        .builder()
        .appName("GradientBoostingTreeGegression")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> dataset = spark
        .read()
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load("dataset/gem_price_ja.csv");

    List<String> categoricalColNames = Arrays.asList("material", "shape", "brand", "shop");

    List<StringIndexer> stringIndexers = categoricalColNames.stream()
        .map(col -> new StringIndexer()
            .setStringOrderType("frequencyDesc")
            .setInputCol(col)
            .setOutputCol(col + "Index"))
        .collect(Collectors.toList());

    String[] indexedCategoricalColNames = stringIndexers
        .stream()
        .map(StringIndexer::getOutputCol)
        .toArray(String[]::new);

    String[] numericColNames = new String[] { "weight" };

    VectorAssembler assembler = new VectorAssembler()
        .setInputCols(array(indexedCategoricalColNames, numericColNames))
        .setOutputCol("features");

    GBTRegressor gbtr = new GBTRegressor()// (1)
        .setLabelCol("price")
        .setFeaturesCol("features")
        .setPredictionCol("prediction");

    PipelineStage[] indexerStages = stringIndexers.toArray(new PipelineStage[0]);

    PipelineStage[] pipelineStages = array(indexerStages, assembler, gbtr);// (2)

    Pipeline pipeline = new Pipeline().setStages(pipelineStages);

    long seed = 0;

    Dataset<Row>[] splits = dataset.randomSplit(new double[] { 0.7, 0.3 }, seed);// (3)

    Dataset<Row> trainingData = splits[0];// (4)
    Dataset<Row> testData = splits[1];// (5)

    PipelineModel pipelineModel = pipeline.fit(trainingData);// (6)

    Dataset<Row> predictions = pipelineModel.transform(testData);// (7)

    predictions.select("id", "material", "shape", "weight", "brand", "shop", "price", "prediction").show(10);// (8)

  }

  @SuppressWarnings("unchecked")
  public static <T> T[] array(final T[] array1, final T... array2) {
    final Class<?> type1 = array1.getClass().getComponentType();
    final T[] joinedArray = (T[]) Array.newInstance(type1, array1.length + array2.length);
    System.arraycopy(array1, 0, joinedArray, 0, array1.length);
    System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
    return joinedArray;
  }
}

Code commentary

** (1) ** ・ ・ ・ Create a ** gradient boosting tree ** estimator for supervised learning. Specify the column name (** price **) you want to predict in *** setLabelCol ***, For *** setFeaturesCol *, specify the column name ( features ) of the feature vector, *** setPredictionCol *** specifies a new column name ( prediction **) to store the prediction results.

 GBTRegressor gbtr = new GBTRegressor()// (1)
        .setLabelCol("price")
        .setFeaturesCol("features")
        .setPredictionCol("prediction");

** (2) ** ・ ・ ・ Added a learning device (** gbt **) to Pipeline. Now, in addition to the categorical variable indexing and vectorization processing, the learner is also added during learning.

PipelineStage[] pipelineStages = array(indexerStages, assembler, gbtr);// (2)
Pipeline pipeline = new Pipeline().setStages(pipelineStages);

** (3) ** ・ ・ ・ *** dataset.randomSplit *** divides the original Dataset randomly at a ratio of 70%: 30%

** (4) ** ・ ・ ・ Of the data set divided, 70% is used as training data.

** (5) ** ・ ・ ・ Of the data set divided, 30% is used as test data.

** (6) ** ・ ・ ・ *** Pipeline.fit *** to put the training data into the trainer and train it. After learning, *** pipelineModel *** can be obtained as a trained ** learning model ** (machine learning model). You will be able to perform regression (prediction of ** price **) using this ** learning model **.

PipelineModel pipelineModel = pipeline.fit(trainingData);// (6)

** (7) ** ・ ・ ・ *** Execute ** prediction ** (regression) using the trained model with pipelineModel.transform (testData) ***! Predict the price of the accessory in the test data specified here. A Dataset containing the prediction results (specifically, ** prediction ** column) is returned.

Dataset<Row> predictions = pipelineModel.transform(testData);// (7)

** (8) ** ・ ・ ・ Display the execution result. In *** show ***, the column name you want to display is specified and displayed.

Yes, the predicted execution result is as follows

image.png

The ** prediction ** on the far right of the table and the ** price ** on the second from the right are the prediction results from machine learning and the answers contained in the original data, respectively.

image.png

The difference between ** prediction ** and ** price ** is the difference between the prediction and the answer, but I think you can predict a value close to that.

I haven't tuned it in particular, but I made a simple "accessory price estimation engine" ^ _ ^

** Continue to next time **

Next time, I would like to deal with ** evaluation indicators of learning results, hyperparameter tuning and grid search **.

Recommended Posts

Introduction to Machine Learning with Spark "Price Estimate" # 3 Make a [Price Estimate Engine] by learning with training data
Introduction to Machine Learning with Spark "Price Estimate" # 2 Data Preprocessing (Handling of Category Variables)
Getting Started with Machine Learning with Spark "Price Estimate" # 1 Loading Datasets with Apache Spark (Java)
I tried to make a machine learning application with Dash (+ Docker) part3 ~ Practice ~
Learning Ruby with AtCoder 13 How to make a two-dimensional array
I tried to make a machine learning application with Dash (+ Docker) part2 ~ Basic way of writing Dash ~
I tried to make a machine learning application with Dash (+ Docker) part1 ~ Environment construction and operation check ~
[Introduction to Docker] Create a Docker image for machine learning and use Jupyter notebook
How to make a factory with a model with polymorphic association
[Machine learning with Apache Spark] Sparse Vector (sparse vector) and Dense Vector (dense vector)
Introduction to RSpec 4. Create test data with Factory Bot
[Introduction to Android application development] Let's make a counter