Neulich erhielt ich die Anfrage, "das für die Datenanalyse verwendete Notebook so wie es ist in Pipeline einzubetten", aber ich konnte kein Paket finden, das dies könnte, und entschied mich, es selbst zu erstellen.
Selbst wenn ich es selbst mache, habe ich der vorhandenen Pipeline nur eine kleine Funktion hinzugefügt. Dieses Mal haben wir Kedro für Pipeline und Papermill übernommen, um Notebook so wie es ist zu integrieren.
Ich habe mich für Kedro entschieden, weil Pipeline eine einfache Struktur hat (nur Schreibfunktionen und Eingabe / Ausgabe), die Dokumentation umfangreich ist und die Lernkosten niedrig zu sein scheinen. Lernkosten sind sehr wichtig, um Vorschläge für jemanden zu machen. Auch, wie Mr. Kinuit sagt, ist das [^ kinuit-kedro] -Logo cool.
Es gibt drei Hauptmerkmale.
Die folgende Abbildung ist eine Visualisierung von Kedros Hello World-Projekt mit Kedro-Viz, wobei das Rechteck die Funktion und das abgerundete Quadrat die Daten darstellt. Es ist ein Bild, dass jedes dieser Rechtecke zu einem Notizbuch wird.
Die YAML der Pipeline ist wie folgt geschrieben. Zum Beispiel ist die Ausgabe "example_train_x" von "split_data" die Eingabe von "train_model", die den Fluss (Pfeil) der Pipeline darstellt.
conf/base/pipelines.yml
# data_engineering pipeline
data_engineering:
# split_data node
split_data:
nb:
input_path: notebooks/data_engineering/split_data.ipynb
parameters:
test_data_ratio: 0.2
inputs:
- example_iris_data
outputs:
- example_train_x
- example_train_y
- example_test_x
- example_test_y
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
# predict node
predict:
nb:
input_path: notebooks/data_science/predict.ipynb
versioned: True
inputs:
- example_model
- example_test_x
outputs:
- example_predictions
# report_accuracy node
report_accuracy:
nb:
input_path: notebooks/data_science/report_accuracy.ipynb
versioned: True
inputs:
- example_predictions
- example_test_y
Wenn Sie beispielsweise "pipelines.yml" wie folgt schreiben, lautet das Ausgabeziel von Notebook "data / 08_reporting / train_model # num_iter = 10000 & lr = 0.01.ipynb /
conf/base/pipelines.yml
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
Ich konnte es noch nicht richtig warten ... Der allgemeine Fluss ist wie folgt.
Erstellen Sie mit dem folgenden Befehl eine Umgebung aus dem Vorlagenprojekt.
$ git clone https://github.com/hrappuccino/kedro-notebook-project.git
$ cd kedro-notebook-project
$ pipenv install
$ pipenv shell
Registrieren Sie alle in der Pipeline angezeigten Daten (einschließlich Zwischenprodukte) im Datenkatalog.
conf/base/catalog.yaml
example_iris_data:
type: pandas.CSVDataSet
filepath: data/01_raw/iris.csv
example_train_x:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_train_x.pkl
example_train_y:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_train_y.pkl
example_test_x:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_test_x.pkl
example_test_y:
type: pickle.PickleDataSet
filepath: data/05_model_input/example_test_y.pkl
example_model:
type: pickle.PickleDataSet
filepath: data/06_models/example_model.pkl
example_predictions:
type: pickle.PickleDataSet
filepath: data/07_model_output/example_predictions.pkl
Informationen zum Schreiben eines Datenkatalogs finden Sie unter Kedros Dokumente.
Grundsätzlich können Sie ein Notebook wie gewohnt erstellen, aber nur die folgenden beiden unterscheiden sich von den üblichen.
--Verwenden Sie den Datenkatalog von Kedro für die Dateneingabe und -ausgabe --Parameterisieren für Papermill
Starten Sie Jupyter Notebook / Lab von Kedro aus.
$ kedro jupyter notebook
$ kedro jupyter lab
Führen Sie den folgenden magischen Befehl im Notizbuch aus. Jetzt können Sie die globale Variable catalog
verwenden.
%reload_kedro
Um Daten zu lesen / speichern, schreiben Sie wie folgt.
data = catalog.load('example_iris_data')
catalog.save('example_train_x', train_x)
Darüber hinaus erfahren Sie unter Kedro Documents, wie Kedro mit Jupyter betrieben wird. Bitte beziehen Sie sich auf.
Um ein Notebook zu parametrisieren, markieren Sie die Zelle mit "Parametern".
Informationen hierzu finden Sie in der Papermill-Dokumentation.
Schreiben Sie die Pipeline in die folgende YAML (oben erneut veröffentlicht).
conf/base/pipelines.yaml
# data_engineering pipeline
data_engineering:
# split_data node
split_data:
nb:
input_path: notebooks/data_engineering/split_data.ipynb
parameters:
test_data_ratio: 0.2
inputs:
- example_iris_data
outputs:
- example_train_x
- example_train_y
- example_test_x
- example_test_y
# data_science pipeline
data_science:
# train_model node
train_model:
nb:
input_path: notebooks/data_science/train_model.ipynb
parameters:
num_iter: 10000
lr: 0.01
versioned: True
inputs:
- example_train_x
- example_train_y
outputs:
- example_model
# predict node
predict:
nb:
input_path: notebooks/data_science/predict.ipynb
versioned: True
inputs:
- example_model
- example_test_x
outputs:
- example_predictions
# report_accuracy node
report_accuracy:
nb:
input_path: notebooks/data_science/report_accuracy.ipynb
versioned: True
inputs:
- example_predictions
- example_test_y
Führen Sie die gesamte Pipeline / einen Teil davon aus.
$ kedro run
$ kedro run --pipeline=data_engineering
Wenn Sie die Option "--parallel" angeben, werden die Teile verarbeitet, die parallel parallelisiert werden können.
$ kedro run --parallel
Weitere Anweisungen zum Ausführen von Pipeline finden Sie in Kedros Dokumentation.
Führen Sie den folgenden Befehl aus, um auf "http: //127.0.0.1: 4141 /" zuzugreifen. Die unten gezeigte Seite wird angezeigt.
$ kedro viz
Führen Sie den folgenden Befehl aus, um auf "http: //127.0.0.1: 5000 /" zuzugreifen. Die unten gezeigte Seite wird angezeigt.
$ mlflow ui
- Hinweis: * Da ich es in einem Notizbuch ausgeführt habe, wird ein einzelnes Experiment in zwei Zeilen aufgezeichnet.
Kedro + MLflow wird auch in Kedros Blog vorgestellt.
Ich werde kurz erklären, wie es funktioniert.
Um genau zu sein, führe ich Notebook mit Papermill in einer Funktion aus.
Im Extremfall müssen Sie nur "pm.execute_notebook" ausführen. Um die Argumente "Notebook" und "Pipeline" zu trennen, werden sie in Klassen unterteilt und mit "init" und "call" empfangen. Zuerst habe ich es mit einem Abschluss implementiert, aber ich war wütend, dass es bei paralleler Verarbeitung nicht serialisiert werden konnte, also habe ich es zu einer Klasse gemacht.
__get_default_output_path
ist ein Prozess zur Versionsverwaltung der Notebook-Ausgabe durch Papermill, der später ausführlich beschrieben wird.
src/kedro_local/nodes/nodes.py
import papermill as pm
from pathlib import Path
import os, re, urllib, datetime
DEFAULT_VERSION = datetime.datetime.now().isoformat(timespec='milliseconds').replace(':', '.') + 'Z'
def _extract_dataset_name_from_log(output_text):
m = re.search('kedro.io.data_catalog - INFO - Saving data to `(\\w+)`', output_text)
return m.group(1) if m else None
class NotebookExecuter:
def __init__(self, catalog, input_path, output_path=None, parameters=None, versioned=False, version=DEFAULT_VERSION):
self.__catalog = catalog
self.__input_path = input_path
self.__parameters = parameters
self.__versioned = versioned
self.__version = version
self.__output_path = output_path or self.__get_default_output_path()
def __call__(self, *args):
nb = pm.execute_notebook(self.__input_path, self.__output_path, self.__parameters)
dataset_names = [
_extract_dataset_name_from_log(output['text'])
for cell in nb['cells'] if 'outputs' in cell
for output in cell['outputs'] if 'text' in output
]
return {dataset_name: self.__catalog.load(dataset_name) for dataset_name in dataset_names if dataset_name}
def __get_default_output_path(self):
#Siehe unten
Lesen Sie die obige YAML und erstellen Sie eine Pipeline. Grundsätzlich konvertiere ich YAML nur in ein Objekt in der Wörterbucheinschlussnotation. Die letzte Pipeline von "default" wird ausgeführt, wenn die Option "--pipeline" in "kedro run" weggelassen wird.
src/kedro_notebook_project/pipeline.py
from kedro.pipeline import Pipeline, node
from kedro_local.nodes import *
import yaml
def create_pipelines(catalog, **kwargs):
with open('conf/base/pipelines.yml') as f:
pipelines_ = yaml.safe_load(f)
pipelines = {
pipeline_name: Pipeline([
node(
NotebookExecuter(catalog, **node_['nb']),
node_['inputs'] if 'inputs' in node_ else None,
{output: output for output in node_['outputs']} if 'outputs' in node_ else None,
name=node_name,
) for node_name, node_ in nodes_.items()
]) for pipeline_name, nodes_ in pipelines_.items()
}
for pipeline_ in list(pipelines.values()):
if '__default__' not in pipelines:
pipelines['__default__'] = pipeline_
else:
pipelines['__default__'] += pipeline_
return pipelines
Es wird nur das Ausgabeziel gemäß der Definition von "pipelines.yml" neu geschrieben. Beachten Sie, dass der Dateiname zu lang ist, wenn self .__ parameters
groß ist. Früher wurde es gehasht, aber da es nicht für Menschen geeignet ist, wird es vorläufig in eine Abfragezeichenfolge konvertiert.
src/kedro_local/nodes/nodes.py
class NotebookExecuter:
#Kürzung
def __get_default_output_path(self):
name, ext = os.path.splitext(os.path.basename(self.__input_path))
if self.__parameters:
name += '#' + urllib.parse.urlencode(self.__parameters)
name += ext
output_dir = Path(os.getenv('PAPERMILL_OUTPUT_DIR', ''))
if self.__versioned:
output_dir = output_dir / name / self.__version
output_dir.mkdir(parents=True, exist_ok=True)
return str(output_dir / name)
Vielen Dank, dass Sie so weit gelesen haben. Der gesamte in diesem Artikel vorgestellte Quellcode befindet sich auf My GitHub. Wenn Sie interessiert sind, nutzen Sie es bitte und geben Sie uns Ihr Feedback.
Recommended Posts