In GCP wird Cloud Dataflow häufig in Pipelines für ETL-Anwendungen verwendet. Zur Erinnerung habe ich jedoch versucht, es nicht nur für die Vorverarbeitung, sondern auch für die Ausführung von Pipelines einschließlich maschinellem Lernen zu verwenden.
Ich möchte in der Lage sein, eine große Anzahl von Lern- / Vorhersageskripten, die in kleinem Maßstab verschoben wurden, einfach auszuführen, wenn in einer verteilten Umgebung eine große Menge an Lernversuchen durchgeführt wird. Es war mühsam, das von mir ausgeführte Lern- / Vorhersageskript so zu ändern, dass es jedes Mal aus der Ferne funktioniert, und eine Maschine einzurichten und zu verteilen. Ich möchte auch in der Lage sein, von der Vorverarbeitung wie der Attributerzeugung zum Lernen und zur Bewertung auf einen Schlag überzugehen. Wenn der Code des Vorverarbeitungs- und Lernteils geteilt ist, kann das Vorhersagemodell nur reproduziert werden, wenn die Code- und Zwischendatenversionen sorgfältig verwaltet werden. Wenn es als Pipeline implementiert ist, kann es in das System integriert werden. Weil es einfach zu sein scheint.
Installieren Sie Folgendes auf einem Arbeitscomputer, auf dem die jeweilige Pipeline ausgeführt oder ein Auftrag an die Cloud gesendet wird. (Beachten Sie, dass Python derzeit nur 2 Systeme unterstützt, wenn es in der Cloud ausgeführt wird.)
python
git clone https://github.com/apache/beam.git
cd beam/sdks/python/
python setup.py sdist
cd dist/
pip install apache-beam-sdk-*.tar.gz
python
pip install --upgrade google-cloud-dataflow
Ich habe 0.6.0 bzw. 0.5.5 in meiner Umgebung installiert. Installieren Sie anschließend Bibliotheken wie scicit-learn und pandas, die für die Ausführung in Ihrer Umgebung erforderlich sind.
Betrachten wir hier die folgende hypothetische Lern- / Vorhersage-Pipeline unter Verwendung von Pandas und Scicit-Learn, die bereits in der Dataflow-Ausführungsumgebung installiert sind.
Hier werden die Daten im Voraus sowohl für das Training als auch für die Auswertung erstellt und in BigQuery abgelegt. Es wird davon ausgegangen, dass die Hyperparameter festgelegt wurden und Sie eine große Anzahl von Vorhersagemodellen gleichzeitig auswerten möchten. Es wird davon ausgegangen, dass das Lernmodell jedes Jahr neu gelernt wird, um die Verschlechterung des Lernmodells im Laufe der Zeit zu bewältigen.
In der Erläuterung werden die von der folgenden Abfrage erfassten Daten als Beispiel verwendet.
python
SELECT year,date,shop_id,sales,attr1,attr2,attr3
FROM dataset.table
Es wird angenommen, dass shop_id der eindeutige Schlüssel des Geschäfts ist, sales die Zielvariable ist und attr1-3 das Attribut ist.
Unten geben wir die Einstellungselemente der Pipeline ein.
Optionseinstellung
import apache_beam as beam
import apache_beam.transforms.window as window
options = beam.utils.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{YOUR_PROJECT}'
google_cloud_options.job_name = 'sklearn'
google_cloud_options.staging_location = 'gs://{YOUR_BUCKET}/binaries'
google_cloud_options.temp_location = 'gs://{YOUR_BUCKET}/temp'
worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 10
#options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'
pipeline = beam.Pipeline(options=options)
In den Google Cloud-Optionen werden die Einstellungen beschrieben, die auf GCP ausgeführt werden sollen. Geben Sie den Speicherort der ausführbaren Datei oder der temporären Datei mit staging_location oder temp_location an.
Worker-Optionen legt den Worker fest. Standardmäßig ermittelt GCP die Konfiguration automatisch entsprechend der Last. (Das Dokument der japanischen Version besagt, dass Python nicht unterstützt wird, die englische Version jedoch, dass es unterstützt wird.) Selbst wenn die automatische Skalierung aktiviert ist, können Sie die Skalierung begrenzen, indem Sie die maximale Anzahl von Arbeitern mit max_num_worker angeben.
Standardoptionen gibt die Umgebung an, in der die Pipeline ausgeführt wird. Wenn DirectRunner angegeben ist, wird es in der jeweiligen Umgebung ausgeführt, und wenn DataflowRunner angegeben wird, wird es auf GCP ausgeführt. Es scheint gut, den Betrieb einer kleinen Workload zu überprüfen und in der Cloud auszuführen, wenn es kein Problem gibt.
Es gibt viele andere Optionseinstellungen. Überprüfen Sie die Befehlszeilenhilfe und Quellkommentare. Ich kann es schaffen
Die Pipeline wird beschrieben, indem jeder Prozess der Reihe nach mit einem Rohrbetreiber verbunden wird.
Pipeline
(pipeline
| "Query data" >> beam.Read(beam.io.BigQuerySource(query=query))
| "Assign time" >> beam.Map(assign_timevalue)
| "Set window" >> beam.WindowInto(window.SlidingWindows(size=3, period=1))
| "Assign group key" >> beam.Map(lambda v: (v["shop_id"], v))
| "Group by group key and time window" >> beam.GroupByKey()
| "Learn and predict" >> beam.FlatMap(learn_predict)
| "Write predict data" >> beam.Write(beam.io.BigQuerySink('dataset.table',
schema="shop_id:STRING, date:STRING, predict:FLOAT, sales:INTEGER",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)))
pipeline.run()
Im ersten Schritt werden die Daten gelesen, indem die Abfrage von BigQuery angegeben wird. Der zweite Schritt gibt den Referenzwert an, der den Bereich des Fensters für den nächsten Schritt bestimmt. Dieses Mal werden die Daten durch das Jahr geteilt. Geben Sie daher die Spalte an, in der das Jahr der einzelnen Daten angegeben ist (Details werden später beschrieben). Der dritte Schritt legt die Breite und den Abstand der Fenster fest. Dieses Mal beträgt die Breite 3 Jahre (2 Jahre Lernen, 1 Jahr Prognose) und sie wird um 1 Jahr verschoben. Stellen Sie also Größe = 3, Zeitraum = 1 ein. Sliding Windows ist ein Fenster zum Schieben, aber es gibt viele andere, z. B. Fixed Windows zum Korrigieren und Sessions für Sitzungen. Im vierten Schritt wird die Geschäfts-ID als der Schlüssel angegeben, für den Sie die Gruppe angeben möchten. Es wird nach dem Fenster und dem Schlüssel (Speicher) gruppiert, die zuvor im 5. Schritt angegeben wurden. Der sechste Schritt führt eine Lern- / Vorhersageverarbeitung für jede gruppierte Daten durch und gibt das Vorhersageergebnis zurück. Der Grund für die Verwendung von FlatMap besteht darin, dass die für jedes Store x-Fenster aggregierten Daten täglich neu verteilt und zurückgegeben werden. Im 7. Schritt wird das tägliche Vorhersageergebnis in BigQuery gespeichert. Die Pipeline wird ausgeführt, wenn die Pipeline in der letzten Stufe ausgeführt wird.
Als nächstes schauen wir uns jede Funktion an.
Eine Funktion, die einen Wert zum Teilen eines Fensters zurückgibt
def assign_timevalue(v):
import apache_beam.transforms.window as window
return window.TimestampedValue(v, v["year"])
Ersetzen Sie den Wert durch TimestampedValue, um den im Fenster zu verwendenden Wert anzugeben. Der erste TimestampedValue ist der Wert und der zweite ist der im Fenster verwendete Wert. Die Einschränkung hierbei ist, dass Sie den Import für Referenzpakete und Module innerhalb der Funktion angeben müssen. Es ist kein Problem, wenn Sie es zur Hand verschieben, aber in der Cloud wird diese Funktion von Arbeiterknoten verteilt und ausgeführt. Sie müssen das Paket importieren, damit es auch in der Umgebung des Worker-Knotens funktioniert. Bitte beachten Sie, dass in der Cloud nicht auf global definierte Variablen zugegriffen werden kann.
Funktion zur Vorhersage des Lernens
def learn_predict(records):
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
target_attr = 'sales'
learn_attrs = ['attr1', 'attr2', 'attr3']
data = pd.DataFrame(records[1])
data[learn_attrs] = data[learn_attrs].apply(pd.to_numeric)
data = data.fillna(0.)
if len(data["year"].unique()) < 3:
return [] #Tun Sie nichts für Kombinationen unter 3 Jahren
year_max = data["year"].max()
train = data[data["year"] < year_max] #Vor 2 Jahren zum Lernen
test = data[data["year"] == year_max] #Das nächste Jahr dient der Prognoseauswertung
model = GradientBoostingRegressor()
model.fit(train[learn_attrs], train[target_attr])
test.loc[:, "predict"] = model.predict(test[learn_attrs])
return test[["shop_id","date","predict","sales"]].to_dict(orient='records')
Da Daten als Taple mit dem Schlüssel (Geschäfts-ID) und der Liste im Wörterbuchformat als Wert an die Lern- / Vorhersagefunktion übergeben werden, wird die Werteliste zum Lernen / Vorhersagen in einen Datenrahmen konvertiert. In der letzten Zeile wird die Konvertierung durchgeführt, um das Ergebnis als Wörterbuchformatliste an die Einfügung von BigQuery im letzten Teil zu übergeben.
Wenn die Pipeline auf diese Weise ausgeführt wird, werden das Vorhersageergebnis und die korrekten Antwortdaten in BigQuery eingegeben, sodass Indikatoren wie RMSE aus verschiedenen Perspektiven wie Speichern und Jahr per SQL berechnet und ausgewertet werden können.
Das Ausführen des Lernprozesses in Dataflow ist möglicherweise eine schlechte Idee für den Zweck des Dienstes, aber ich konnte ihn verschieben. Diesmal war es eine einfache Einweg-Pipeline, die aus den in BigQuery erstellten Daten lernt und vorhersagt und das Ergebnis speichert. Sie können jedoch Datenverarbeitung usw. hinzufügen, Auswertungsdaten aus einem anderen Fluss und das Ergebnis übergeben Es scheint, dass es flexibel geändert werden kann, indem das Vorhersageergebnis und das Vorhersagemodell verzweigt und an die nachfolgende Stufe ausgegeben werden. Es wurde angenommen, dass die Hyperparameter dieses Mal bereits festgelegt wurden, aber ich möchte versuchen, die Parameteroptimierung massenparallel auszuführen.
Cloud Dataflow ist ein Dienst, der in GCP nicht viel Beachtung gefunden hat. Dataflow verwaltet jedoch persönlich den Aufbau und Betrieb des Datenflusses, was für Anwendungen, die komplexe Datenverarbeitungen wie maschinelles Lernen verarbeiten, in der Regel problematisch ist. Erwartet, wie AppEngine für Datenanalyseanwendungen zu sein.
Dieses Mal habe ich scicit-learn verwendet, das standardmäßig in Dataflow installiert ist, aber in Wirklichkeit möchten Sie verschiedene Bibliotheken verwenden. Beim nächsten Mal möchte ich das Verfahren zum Installieren einer Bibliothek am Beispiel der Installation von xgboost beschreiben.
Offizielle Dokumentation
Mr. Nakais leicht verständliche Trilogie zur Google-Datenverarbeitungstechnologie
Verteilte Datenverarbeitungsplattform FlumeJava mit MapReduce als Backend
Entwurfsmuster der durch "Cloud Dataflow" realisierten Streaming-Verarbeitung
Zugehöriger Code
Beispiel für die verteilte Verarbeitung von TensorFlow Prediction mit Cloud-Datenfluss
Spezielle Bildklassifizierung nach Cloud Machine Learning und Cloud Dataflow
Recommended Posts