In this article, we will use a library called Kedro to build a workflow for Titanic survival prediction.
Recently, many types of machine learning workflow management tools and construction support tools have appeared. Even if you've heard it
And so on.
Let's compare the performance of these tools through a common forecasting project! That is the beginning of this article writing.
So, first try from Kedro.
Here's why I chose Kedro:
――It was popular on twitter for a while because it was easy to use. -Quick Start Project and [Tutorial](https://kedro.readthedocs.io/en/stable/03_tutorial/01_workflow. html) was written well and it seemed easy to start
kedro is a machine learning workflow management tool for Python developed by McKinsey's data analysis company QuantumBlack.
As listed below, it is a useful tool for experiment management at the PoC stage, but on the other hand, it has weak functions for production.
Smooth experimental development of machine learning models
--Creating directories and Python code templates --Command line execution of data processing-model construction based on the pipeline described according to the format --Management of output results and intermediate data objects
--Unify directory structure and code format --Easy management of data and intermediate objects --Easy to read and save automatically just by writing in catalog.yml --Easy to specify parameters on a text basis --If you write it in parameters.yml, you can easily read it with string.
--Reloading models and preprocessing objects ――What should I do if I want to make inferences about new data? ――In particular, can you do it with version specification included? --Customize directory structure, etc.
--Comparison of accuracy between different models --It seems to be complementary to MLflow --Deploy to production environment and job execution management / monitoring ――The direction seems to be different from Airflow and luigi
Below, I would like to introduce how to use Kedro, using Titanic, which is familiar in data analysis, as an example.
Basically, it is created by referring to Official Tutorial. The code has been uploaded to GitHub.
kedro new
--Automatically creates a directorykedro run
--If you set it in catalog.yml, it will save the intermediate file.
--Model versioning is also possible.When you execute kedro new
from the command line, you will be prompted to enter the project name, repository name, and package name as shown below.
$ kedro new
Project Name:
=============
Please enter a human readable name for your new project.
Spaces and punctuation are allowed.
[New Kedro Project]: Titanic with Kedro
Repository Name:
================
Please enter a directory name for your new project repository.
Alphanumeric characters, hyphens and underscores are allowed.
Lowercase is recommended.
[titanic-with-kedro]: titanic-with-kedro
Python Package Name:
====================
Please enter a valid Python package name for your project package.
Alphanumeric characters and underscores are allowed.
Lowercase is recommended. Package name must start with a letter or underscore.
[titanic_with_kedro]: titanic_with_kedro
Generate Example Pipeline:
==========================
Do you want to generate an example pipeline in your project?
Good for first-time users. (default=N)
[y/N]: N
The project directory will be created with the name entered in Repository Name
.
Looking inside, it looks like the following.
titanic-with-kedro
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── credentials.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
├── data
│ ├── 01_raw
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_features
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── errors.log
├── info.log
├── kedro_cli.py
├── logs
├── notebooks
├── references
├── results
├── setup.cfg
└── src
├── requirements.txt
├── setup.py
├── tests
│ ├── __init__.py
│ └── test_run.py
└── titanic_with_kedro
├── __init__.py
├── nodes
│ └── __init__.py
├── pipeline.py
├── pipelines
│ └── __init__.py
└── run.py
The'Python Package Name'heard in the middle is used as the name of the directory where the pipeline code generated under src
is placed (this time titanic_with_kedro
).
Finally, you will be asked'Do you want to generate an example pipeline in your project?', But if you press y here, the tutorial code will be generated as a set. Since it is unnecessary from the second time onward, press enter without entering N.
This time, I will fetch the data from kaggle using API and put it in data / 01_raw /
.
Registration with kaggle and acquisition of authentication token are required separately.
$ cd data/01_raw/
$ kaggle competitions download -c titanic
$ unzip titanic.zip
In addition to this, edit the data catalog as follows.
conf/base/catalog.yml
train:
type: CSVLocalDataSet
filepath: data/01_raw/train.csv
test:
type: CSVLocalDataSet
filepath: data/01_raw/test.csv
The data names train
and test
are also used when reading data.
With type:
, you can select the data reading format prepared by kedro in advance. [^ 2]
[^ 2]: You can also apply a custom created data loading function.
Let's try to see if the data can be read.
Execute the following kedro command to launch Jupyter notebook (or IPython). The notebook will start up with the catalog
for reading data imported in advance [^ 3].
$ kedro jupyter notebook
df_train = catalog.load("train")
df_train.head()
[^ 3]: By default, it may not be possible to execute due to a module load error. In that case, open src / <project-name> /pipeline.py
and delete the module import for Tutorial.
Describe the data preprocessing pipeline according to the kedro format.
src/titanic_with_kedro/pipelines/data_engineering/pipeline.py
from kedro.pipeline import node, Pipeline
from titanic_with_kedro.nodes import preprocess
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=preprocess.preprocess,
inputs="train",
outputs="train_prep",
name="preprocess",
),
],
tags=['de_tag'],
)
We will store the processing unit node
in the Pipeline
class provided by kedro.
For the node
function,
--func: Function that describes the process --inputs: input data name --outputs: output data name --name: node name
Is specified.
The processing to be performed on node is specified by passing a function object to the argument func
.
In this example, the function preprocess ()
is used as one node to complete missing values and label encoding.
At this time, the data specified by the argument ʻinputs is used for the input of the function
preprocess () . In the above example,
train is specified, but this points to the
traindata defined in the above data catalog
conf / base / catalog.yml`, and the data format described in the catalog. It reads the file based on the path and path and inputs it.
The output object is then given the label specified by ʻoutputs`. When using this object in subsequent processing, you can specify it with this label and call it.
The preprocessing function specified in node this time is as follows. Unlike pipeline, there is no need to use a special description method.
(The function _label_encoding ()
is an auxiliary function)
src/titanic_with_kedro/nodes/preprocess.py
import pandas as pd
from sklearn import preprocessing
def _label_encoding(df: pd.DataFrame) -> (pd.DataFrame, dict):
df_le = df.copy()
# Getting Dummies from all categorical vars
list_columns_object = df_le.columns[df_le.dtypes == 'object']
dict_encoders = {}
for column in list_columns_object:
le = preprocessing.LabelEncoder()
mask_nan = df_le[column].isnull()
df_le[column] = le.fit_transform(df_le[column].fillna('NaN'))
df_le.loc[mask_nan, column] *= -1 # transform minus for missing records
dict_encoders[column] = le
return df_le, dict_encoders
def preprocess(df: pd.DataFrame) -> pd.DataFrame:
df_prep = df.copy()
drop_cols = ['Name', 'Ticket', 'PassengerId']
df_prep = df_prep.drop(drop_cols, axis=1)
df_prep['Age'] = df_prep['Age'].fillna(df_prep['Age'].mean())
# Filling missing Embarked values with most common value
df_prep['Embarked'] = df_prep['Embarked'].fillna(df_prep['Embarked'].mode()[0])
df_prep['Pclass'] = df_prep['Pclass'].astype(str)
# Take the frist alphabet from Cabin
df_prep['Cabin'] = df_prep['Cabin'].str[0]
# Label Encoding for str columns
df_prep, _ = _label_encoding(df_prep)
return df_prep
You can also combine multiple pipelines created.
This time I defined the model build in another pipelinesrc / titanic_with_kedro / pipelines / data_science / pipeline.py
.
To combine this with the previous preprocessing, do the following:
src/titanic_with_kedro/pipeline.py
from typing import Dict
from kedro.pipeline import Pipeline
from titanic_with_kedro.pipelines.data_engineering import pipeline as de
from titanic_with_kedro.pipelines.data_science import pipeline as ds
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
"""Create the project's pipeline.
Args:
kwargs: Ignore any additional arguments added in the future.
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
de_pipeline = de.create_pipeline()
ds_pipeline = ds.create_pipeline()
return {
"de": de_pipeline,
"ds": ds_pipeline,
"__default__": de_pipeline + ds_pipeline,
}
The model construction pipeline is defined as follows.
src/titanic_with_kedro/pipelines/data_science/pipeline.py
from kedro.pipeline import node, Pipeline
from titanic_with_kedro.nodes import modeling
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=modeling.split_data,
inputs=["train_prep", "parameters"],
outputs=["X_train", "X_test", "y_train", "y_test"],
),
node(func=modeling.train_model,
inputs=["X_train", "y_train"],
outputs="clf"),
node(
func=modeling.evaluate_model,
inputs=["clf", "X_test", "y_test"],
outputs=None,
),
],
tags=["ds_tag"],
)
In this way, you can place multiple nodes in one pipeline.
After defining the pipeline, issue an execution command from the root of the project.
$ kedro run
By doing this, src / <project_name> /pipeline.py
will be called and the process will be executed.
At this time, if you want to save the intermediate data after preprocessing and the created model (random forest this time), add the following to the data catalog.
conf/base/catalog.yml
train_prep:
type: CSVLocalDataSet
filepath: data/02_intermediate/train_prep.csv
clf:
type: PickleLocalDataSet
filepath: data/06_models/classifier.pickle
versioned: true
train_prep
and clf
refer to the preprocessed data and the trained model, respectively, but when defining the pipeline, the name specified in the ʻoutputs argument of the
node` function is recognized as it is, and it is set to the specified format path. It will save it.
Also, if you set versioned
to true, it will be saved in a different directory each time it is executed [^ 4].
[^ 4]: In this case, a new time is created under data / 06_models / classifier.pickle /
at the time of execution, and data / 06_models / classifier.pickle / 2020-02-22T06.26.54.486Z It will be saved as /classifier.pickle
.
The basic functions are as above, but there are also the following functions.
When creating an executable code, I think that there are many people who write it while executing it one by one with Jupyter notebook without writing it in a .py file from the beginning.
In this case, it is troublesome to rewrite the code to .py later, but if you use kedro's cli, only the specified part of the code will be spit out to .py.
To do this, first launch Jupyter from kedro's cli.
$ kedro jupyter notebook
Next, add the node
tag only to the cells you want to write to .py.
For tagging, select View> Cell Toolbar> Tags
from the menu at the top of the screen. Since the tag input window is displayed at the top of each cell, type node
to add it.
Then, if you execute the following from the command line, only the tagged part will be extracted and src / <project name> /nodes/<notebook name> .py
will be generated.
kedro jupyter convert notebooks/<notebook name>.ipynb
Reference
https://kedro.readthedocs.io/en/latest/04_user_guide/11_ipython.html
You can also read the parameters specified in the external file when defining the pipeline.
Looking at the above model building parameters again, there is a place where " parameters "
is specified in inputs.
src/titanic_with_kedro/pipelines/data_science/pipeline.py
#abridgement
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=modeling.split_data,
inputs=["train_prep", "parameters"],
outputs=["X_train", "X_test", "y_train", "y_test"],
#abridgement
The above is the part to be divided into training data and test data.
" parameters "
refers to the parameters.yml
file under the conf / base
directory.
conf/base/parameters.yml
test_size: 0.2
random_state: 17
By doing this, you can automatically pass it as a dictionary type object to the argument of the node function and refer to it as shown below.
#abridgement
def split_data(data: pd.DataFrame, parameters: Dict) -> List:
"""Splits data into training and test sets.
Args:
data: Source data.
parameters: Parameters defined in parameters.yml.
Returns:
A list containing split data.
"""
target_col = 'Survived'
X = data.drop(target_col, axis=1).values
y = data[target_col].values
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return [X_train, X_test, y_train, y_test]
#abridgement
As mentioned above, after introducing how to use Kedro, I tried to make a Titanic prediction.
The forecast flow above is flawed.
If you are accustomed to data analysis, you may have noticed,
--Pre-processing is performed before the training / test data is separated. --There is no inference flow when new data is given
That can be mentioned.
Especially for the former, there is a risk of leakage because the test data information is mixed with the training data when completing missing values and label encoding.
Both problems can be solved by writing the preprocessing part in a class and consolidating the instance with pickle so that it can be read and used later.
The solution is not that difficult, and it is expected that it can be realized by adding a preprocessing object to the output of node, editing catalog.yml and saving it at runtime.
However, there is a version control problem.
If both the preprocessing part and the model are saved with version control, how should the preprocessing objects of the version corresponding to each model be linked?
For example, suppose you want to rewrite the preprocessing part to create a model and later make inferences with the old model. At this time, if the new preprocessing code is not compatible with the old model, the preprocessing object must also be replaced with the one used when the old model was created.
To do this effortlessly, some tagging or execution ID assignment is required, but it is unconfirmed whether this can be done with Kedro. [^ 5]
[^ 5]: In MLflow and Metaflow, it is possible to read models and intermediate objects by specifying the execution ID. If combined well with these, it may be possible ...
In addition, there are some points that I thought "I wish I had this function ..." when I touched Kedro.
--Can you customize the directory structure and code provided as a template? ――Is it possible to record the output prediction accuracy together and compare it for each model and parameter?
there is.
The former may be tampered with.
The latter is probably outside the scope of Kedro, so it seems that we have to use it with other tools. Specifically, MLflow from Databricks seems to be just right. In fact, an article written by Kedro developer Quantum Black also discusses the combination with MLflow There is also a library called PipelineX that combines both.
Recommended Posts