[GO] Führen Sie eine Pipeline für maschinelles Lernen mit Cloud Dataflow (Python) aus.

In GCP wird Cloud Dataflow häufig in Pipelines für ETL-Anwendungen verwendet. Zur Erinnerung habe ich jedoch versucht, es nicht nur für die Vorverarbeitung, sondern auch für die Ausführung von Pipelines einschließlich maschinellem Lernen zu verwenden.

Was du machen willst

Ich möchte in der Lage sein, eine große Anzahl von Lern- / Vorhersageskripten, die in kleinem Maßstab verschoben wurden, einfach auszuführen, wenn in einer verteilten Umgebung eine große Menge an Lernversuchen durchgeführt wird. Es war mühsam, das von mir ausgeführte Lern- / Vorhersageskript so zu ändern, dass es jedes Mal aus der Ferne funktioniert, und eine Maschine einzurichten und zu verteilen. Ich möchte auch in der Lage sein, von der Vorverarbeitung wie der Attributerzeugung zum Lernen und zur Bewertung auf einen Schlag überzugehen. Wenn der Code des Vorverarbeitungs- und Lernteils geteilt ist, kann das Vorhersagemodell nur reproduziert werden, wenn die Code- und Zwischendatenversionen sorgfältig verwaltet werden. Wenn es als Pipeline implementiert ist, kann es in das System integriert werden. Weil es einfach zu sein scheint.

Installation

Installieren Sie Folgendes auf einem Arbeitscomputer, auf dem die jeweilige Pipeline ausgeführt oder ein Auftrag an die Cloud gesendet wird. (Beachten Sie, dass Python derzeit nur 2 Systeme unterstützt, wenn es in der Cloud ausgeführt wird.)

python


git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz

python


pip install --upgrade google-cloud-dataflow

Ich habe 0.6.0 bzw. 0.5.5 in meiner Umgebung installiert. Installieren Sie anschließend Bibliotheken wie scicit-learn und pandas, die für die Ausführung in Ihrer Umgebung erforderlich sind.

Führen Sie die Lern- / Vorhersage-Pipeline scikit-learn aus

Betrachten wir hier die folgende hypothetische Lern- / Vorhersage-Pipeline unter Verwendung von Pandas und Scicit-Learn, die bereits in der Dataflow-Ausführungsumgebung installiert sind.

Hier werden die Daten im Voraus sowohl für das Training als auch für die Auswertung erstellt und in BigQuery abgelegt. Es wird davon ausgegangen, dass die Hyperparameter festgelegt wurden und Sie eine große Anzahl von Vorhersagemodellen gleichzeitig auswerten möchten. Es wird davon ausgegangen, dass das Lernmodell jedes Jahr neu gelernt wird, um die Verschlechterung des Lernmodells im Laufe der Zeit zu bewältigen.

In der Erläuterung werden die von der folgenden Abfrage erfassten Daten als Beispiel verwendet.

python


SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table

Es wird angenommen, dass shop_id der eindeutige Schlüssel des Geschäfts ist, sales die Zielvariable ist und attr1-3 das Attribut ist.

Möglichkeit

Unten geben wir die Einstellungselemente der Pipeline ein.

Optionseinstellung


import apache_beam as beam
import apache_beam.transforms.window as window

options = beam.utils.pipeline_options.PipelineOptions()

google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'

worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10

#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'

pipeline = beam.Pipeline(options=options)

In den Google Cloud-Optionen werden die Einstellungen beschrieben, die auf GCP ausgeführt werden sollen. Geben Sie den Speicherort der ausführbaren Datei oder der temporären Datei mit staging_location oder temp_location an.

Worker-Optionen legt den Worker fest. Standardmäßig ermittelt GCP die Konfiguration automatisch entsprechend der Last. (Das Dokument der japanischen Version besagt, dass Python nicht unterstützt wird, die englische Version jedoch, dass es unterstützt wird.) Selbst wenn die automatische Skalierung aktiviert ist, können Sie die Skalierung begrenzen, indem Sie die maximale Anzahl von Arbeitern mit max_num_worker angeben.

Standardoptionen gibt die Umgebung an, in der die Pipeline ausgeführt wird. Wenn DirectRunner angegeben ist, wird es in der jeweiligen Umgebung ausgeführt, und wenn DataflowRunner angegeben wird, wird es auf GCP ausgeführt. Es scheint gut, den Betrieb einer kleinen Workload zu überprüfen und in der Cloud auszuführen, wenn es kein Problem gibt.

Es gibt viele andere Optionseinstellungen. Überprüfen Sie die Befehlszeilenhilfe und Quellkommentare. Ich kann es schaffen

Pipeline-Definition

Die Pipeline wird beschrieben, indem jeder Prozess der Reihe nach mit einem Rohrbetreiber verbunden wird.

Pipeline


(pipeline
 | "Query data"  >> beam.Read(beam.io.BigQuerySource(query=query))
 | "Assign time" >> beam.Map(assign_timevalue)
 | "Set window"  >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
 | "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
 | "Group by group key and time window" >> beam.GroupByKey()
 | "Learn and predict"  >> beam.FlatMap(learn_predict)
 | "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
                              schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))

pipeline.run()

Im ersten Schritt werden die Daten gelesen, indem die Abfrage von BigQuery angegeben wird. Der zweite Schritt gibt den Referenzwert an, der den Bereich des Fensters für den nächsten Schritt bestimmt. Dieses Mal werden die Daten durch das Jahr geteilt. Geben Sie daher die Spalte an, in der das Jahr der einzelnen Daten angegeben ist (Details werden später beschrieben). Der dritte Schritt legt die Breite und den Abstand der Fenster fest. Dieses Mal beträgt die Breite 3 Jahre (2 Jahre Lernen, 1 Jahr Prognose) und sie wird um 1 Jahr verschoben. Stellen Sie also Größe = 3, Zeitraum = 1 ein. Sliding Windows ist ein Fenster zum Schieben, aber es gibt viele andere, z. B. Fixed Windows zum Korrigieren und Sessions für Sitzungen. Im vierten Schritt wird die Geschäfts-ID als der Schlüssel angegeben, für den Sie die Gruppe angeben möchten. Es wird nach dem Fenster und dem Schlüssel (Speicher) gruppiert, die zuvor im 5. Schritt angegeben wurden. Der sechste Schritt führt eine Lern- / Vorhersageverarbeitung für jede gruppierte Daten durch und gibt das Vorhersageergebnis zurück. Der Grund für die Verwendung von FlatMap besteht darin, dass die für jedes Store x-Fenster aggregierten Daten täglich neu verteilt und zurückgegeben werden. Im 7. Schritt wird das tägliche Vorhersageergebnis in BigQuery gespeichert. Die Pipeline wird ausgeführt, wenn die Pipeline in der letzten Stufe ausgeführt wird.

Als nächstes schauen wir uns jede Funktion an.

Eine Funktion, die einen Wert zum Teilen eines Fensters zurückgibt


def assign_timevalue(v):

    import apache_beam.transforms.window as window

    return window.TimestampedValue(v, v["year"])

Ersetzen Sie den Wert durch TimestampedValue, um den im Fenster zu verwendenden Wert anzugeben. Der erste TimestampedValue ist der Wert und der zweite ist der im Fenster verwendete Wert. Die Einschränkung hierbei ist, dass Sie den Import für Referenzpakete und Module innerhalb der Funktion angeben müssen. Es ist kein Problem, wenn Sie es zur Hand verschieben, aber in der Cloud wird diese Funktion von Arbeiterknoten verteilt und ausgeführt. Sie müssen das Paket importieren, damit es auch in der Umgebung des Worker-Knotens funktioniert. Bitte beachten Sie, dass in der Cloud nicht auf global definierte Variablen zugegriffen werden kann.

Funktion zur Vorhersage des Lernens


def learn_predict(records):

    import pandas as pd
    from sklearn.ensemble import GradientBoostingRegressor

    target_attr = 'sales'
    learn_attrs = ['attr1', 'attr2', 'attr3']

    data = pd.DataFrame(records[1])
    data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
    data = data.fillna(0.)

    if len(data["year"].unique()) < 3:
        return [] #Tun Sie nichts für Kombinationen unter 3 Jahren

    year_max = data["year"].max()
    train = data[data["year"] <  year_max] #Vor 2 Jahren zum Lernen
    test  = data[data["year"] == year_max] #Das nächste Jahr dient der Prognoseauswertung

    model = GradientBoostingRegressor()
    model.fit(train[learn_attrs], train[target_attr])
    test.loc[:, "predict"] = model.predict(test[learn_attrs])

    return test[["shop_id","date","predict","sales"]].to_dict(orient='records')

Da Daten als Taple mit dem Schlüssel (Geschäfts-ID) und der Liste im Wörterbuchformat als Wert an die Lern- / Vorhersagefunktion übergeben werden, wird die Werteliste zum Lernen / Vorhersagen in einen Datenrahmen konvertiert. In der letzten Zeile wird die Konvertierung durchgeführt, um das Ergebnis als Wörterbuchformatliste an die Einfügung von BigQuery im letzten Teil zu übergeben.

Wenn die Pipeline auf diese Weise ausgeführt wird, werden das Vorhersageergebnis und die korrekten Antwortdaten in BigQuery eingegeben, sodass Indikatoren wie RMSE aus verschiedenen Perspektiven wie Speichern und Jahr per SQL berechnet und ausgewertet werden können.

schließlich

Das Ausführen des Lernprozesses in Dataflow ist möglicherweise eine schlechte Idee für den Zweck des Dienstes, aber ich konnte ihn verschieben. Diesmal war es eine einfache Einweg-Pipeline, die aus den in BigQuery erstellten Daten lernt und vorhersagt und das Ergebnis speichert. Sie können jedoch Datenverarbeitung usw. hinzufügen, Auswertungsdaten aus einem anderen Fluss und das Ergebnis übergeben Es scheint, dass es flexibel geändert werden kann, indem das Vorhersageergebnis und das Vorhersagemodell verzweigt und an die nachfolgende Stufe ausgegeben werden. Es wurde angenommen, dass die Hyperparameter dieses Mal bereits festgelegt wurden, aber ich möchte versuchen, die Parameteroptimierung massenparallel auszuführen.

Cloud Dataflow ist ein Dienst, der in GCP nicht viel Beachtung gefunden hat. Dataflow verwaltet jedoch persönlich den Aufbau und Betrieb des Datenflusses, was für Anwendungen, die komplexe Datenverarbeitungen wie maschinelles Lernen verarbeiten, in der Regel problematisch ist. Erwartet, wie AppEngine für Datenanalyseanwendungen zu sein.

Dieses Mal habe ich scicit-learn verwendet, das standardmäßig in Dataflow installiert ist, aber in Wirklichkeit möchten Sie verschiedene Bibliotheken verwenden. Beim nächsten Mal möchte ich das Verfahren zum Installieren einer Bibliothek am Beispiel der Installation von xgboost beschreiben.

Referenz

Recommended Posts

Führen Sie eine Pipeline für maschinelles Lernen mit Cloud Dataflow (Python) aus.
Führen Sie XGBoost mit Cloud Dataflow (Python) aus.
Erstellen Sie eine Python-Umgebung für maschinelles Lernen mit Containern
Maschinelles Lernen mit Python! Vorbereitung
Beginnend mit maschinellem Python-Lernen
Erstellen Sie mit Python eine Entwicklungsumgebung für maschinelles Lernen
Maschinelles Lernen mit Python (1) Gesamtklassifizierung
"Scraping & maschinelles Lernen mit Python" Lernnotiz
Bis Sie mit Python unter Windows 7 eine maschinelle Lernumgebung erstellen und ausführen
Führen Sie Cloud Dataflow (Python) über AppEngine aus
Verstärken Sie Bilder für maschinelles Lernen mit Python
Erstellen einer Windows 7-Umgebung für eine Einführung in das maschinelle Lernen mit Python
Maschinelles Lernen mit Python (2) Einfache Regressionsanalyse
Eine Geschichte über maschinelles Lernen mit Kyasuket
[Shakyo] Begegnung mit Python zum maschinellen Lernen
Führen Sie eine Python-Webanwendung mit Docker aus
Aufbau einer KI / maschinellen Lernumgebung mit Python
[Python] Einfache Einführung in das maschinelle Lernen mit Python (SVM)
Maschinelles Lernen beginnend mit Python Personal Memorandum Part2
Maschinelles Lernen beginnend mit Python Personal Memorandum Part1
[Python] Sammeln Sie Bilder mit Icrawler für maschinelles Lernen [1000 Blatt]
Lassen Sie uns einen Teil des maschinellen Lernens mit Python berühren
Ich habe mit der maschinellen Vorverarbeitung von Python Data begonnen
Python lernen mit ChemTHEATER 03
Führen Sie Python mit VBA aus
Python lernen mit ChemTHEATER 05-1
Führen Sie prepDE.py mit python3 aus
Führen Sie Blender mit Python aus
Python lernen mit ChemTHEATER 02
Python lernen mit ChemTHEATER 01
Cloud Run Tutorial (Python)
Führen Sie iperf mit Python aus
Ein Anfänger des maschinellen Lernens versuchte, mit Python ein Vorhersagemodell für Pferderennen zu erstellen
Erstellen Sie eine Python-Umgebung für maschinelles Lernen unter Mac OS
Fühlen wir uns wie ein Materialforscher mit maschinellem Lernen
[Python] Ich habe einen Klassifikator für Iris erstellt [Maschinelles Lernen]
Zusammenfassung des grundlegenden Ablaufs des maschinellen Lernens mit Python
Führen Sie eine Python-Datei mit relativem Import in PyCharm aus
Konstruktionsnotiz für eine maschinelle Lernumgebung von Python
Ich möchte einen Quantencomputer mit Python betreiben
Erstellen Sie mit Winsows 10 eine maschinelle Lernumgebung von Grund auf neu
MALSS (Einführung), ein Tool, das maschinelles Lernen in Python unterstützt
Ich habe versucht, mit Python Machine Learning ein Echtzeit-Modell zur Trennung von Tonquellen zu erstellen
Vorhersage der Zielzeit eines vollständigen Marathons mit maschinellem Lernen - Visual: Visualisierung von Daten mit Python-
Maschinelles Lernen mit Pokemon gelernt
Führen Sie Python mit PyCharm aus (Windows)
Führen Sie Python mit CloudFlash aus (arm926ej-s)
Verbessertes Lernen ab Python
Machen Sie eine Lotterie mit Python
Maschinelles Lernen Minesweeper mit PyTorch
Erstellen Sie eine maschinelle Lernumgebung
Führen Sie Label mit tkinter [Python] aus.
Python Machine Learning Programming> Schlüsselwörter
Iterative Verarbeitung von Python durch Chemoinfomatik gelernt
Erstellen Sie ein Verzeichnis mit Python
Versuchen Sie es mit Kaggle leicht maschinell