(Aktualisiert nach 5/31 v2.0.0 Release)
Die Cloud-Datenfluss-Python-Version ist endlich GA, also habe ich versucht zu prüfen, ob die Vorlagenausführung mit der Python-Version durchgeführt werden kann, aber ich habe versucht, die Pipeline im Voraus zu registrieren ~~ (obwohl Parameter ab 3/23 nicht übergeben werden können) ~~ (5/31) (Es ist jetzt möglich, Parameter in der Version 2.0.0 zu übergeben.) Ich konnte von AppEngine booten und möchte die Prozedur freigeben.
Mit dieser Funktion können Sie die Datenfluss-Pipeline im Voraus in GCS registrieren und die registrierte Pipeline ausführen, indem Sie jederzeit beliebige Parameter übergeben. Durch Aufrufen der Vorlagenausführung über AppEngine können Sie die Datenverarbeitung und Analyseverarbeitung einfach über das Programm ausführen, ohne einen Server zum Starten der Pipeline selbst einrichten zu müssen. Sie können cron auch verwenden, um die Datenanalyse-Pipeline regelmäßig auszuführen.
In der Phase des Versuchs und Irrtums wird es möglicherweise nicht häufig verwendet, um die Genauigkeit durch maschinelles Lernen zu verbessern. Wenn es jedoch in den tatsächlichen Betrieb geht, ist es möglich, eine komplizierte Datenverarbeitungspipeline auszuführen, ohne sich um den Serverbetrieb sorgen zu müssen. Ich denke, dass es für den Bediener viel einfacher sein wird. Darüber hinaus scheint die Entwicklung einfacher zu sein, wenn die während des Versuchs und Irrtums verwendete Pipeline in einer ähnlichen Form in Betrieb genommen werden kann. (Neben der Schwierigkeit, ein hochpräzises Modell zu erstellen, sollte es ziemlich schwierig sein, das erstellte maschinelle Lernmodell als ein System zu gestalten, das stabil arbeitet.)
Die Ausführung der Datenflussvorlage umfasst die folgenden Schritte:
Im Folgenden möchte ich die Vorgehensweise für jeden Schritt erläutern.
Definiert eine benutzerdefinierte Optionsklasse zum Empfangen von Parametern, die von außen übergeben werden. In Beam bezieht sich das Programm auf Parameter, die zur Laufzeit von außen über die ValueProvider-Klasse übergeben werden. Die PipelineOptions-Klasse verfügt über einen eigenen Parser mit der Methode add_value_provider_argument zum Lesen von Parametern als ValueProvider. Erstellen Sie eine benutzerdefinierte Optionsklasse, die die PipelineOptions-Klasse erbt, und beschreiben Sie die Einstellungen der Parameter, die Sie dem Parser in der bei der Initialisierung aufgerufenen Methode _add_argparse_args hinzufügen möchten. Im folgenden Beispiel werden Eingabe, Ausgabe und Datum als benutzerdefinierte Parameter angegeben.
pipeline.py
import apache_beam as beam
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default="gs://{mybucket}/{pathtofile}",
help='input gcs file path')
parser.add_value_provider_argument(
'--output',
default="gs://{mybucket}/{pathtofile}",
help='output gcs file path')
parser.add_value_provider_argument(
'--date',
default="20170531",
help='today')
options = MyOptions()
Ändern Sie diese Option, um den Wert von ValueProvider bei der Verarbeitung der Pipeline zu verwenden. ValueProvider kann als Variable der zuvor erstellten benutzerdefinierten Option referenziert werden. Das Programm muss den Wert verzögert über ValueProvider abrufen und den Wert mit .get () abrufen. Wenn Sie ValueProvider in der internen Verarbeitung von PTransform oder DoFn verwenden möchten, übergeben Sie ValueProvider im Konstruktor, behalten Sie es als Instanzvariable bei und verwenden Sie intern .get (), um darauf zu verweisen. Beachten Sie, dass die von Beam bereitgestellten Klassen ReadFromText und WriteToText ValueProvider direkt als Argument übergeben können. Im folgenden Beispiel wird die durch die externe Parametereingabe angegebene Zieldatei gelesen, jede Zeile durch die durch das Datum angegebene Zeichenfolge ersetzt und die Datei in den durch die Ausgabe angegebenen Pfad geschrieben.
pipeline.py
class MyDoFn(beam.DoFn):
#Empfangen Sie ValueProvider im Konstruktor und setzen Sie die Instanzvariable
def __init__(self, date):
self._date = date
def process(self, element):
yield self._date.get() #Wert ist.get()Get by Methode
p = beam.Pipeline(options=options)
(p | "Read" >> beam.io.ReadFromText(options.input)
| "DoFn" >> beam.ParDo(MyDoFn(options.date)) #Übergeben Sie ValueProvider an den DoFn-Konstruktor
| "Write" >> beam.io.WriteToText(options.output))
p.run()
Wenn Sie den GCS-Pfad, für den die Vorlage registriert ist, zu Google Cloud Options hinzufügen und ausführen, wird stattdessen die Pipeline ausgeführt, aber die Vorlagendatei, die den Verarbeitungsinhalt der Pipeline beschreibt, wird im angegebenen GCS-Pfad registriert. Die Ausführungsumgebung sollte überall dort in Ordnung sein, wo Dataflow Runner die Pipeline ausführen kann.
pipeline.py
options = MyOptions()
#Geben Sie DataflowRunner für Runner an
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# template_Geben Sie den GCS-Pfad an, um die Vorlage am Speicherort zu registrieren
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'
#Führen Sie die Pipeline aus
p = beam.Pipeline(options=options)
~Pipeline-Verarbeitungscode~
p.run().wait_until_finish()
Bei der obigen Ausführung wird im angegebenen GCS-Pfad eine Vorlagendatei generiert, die den Verarbeitungsinhalt der Pipeline beschreibt.
Die Ausführung der registrierten Vorlage sendet Anweisungen an Google REST API. Die Google Cloud-Clientbibliothek wird anscheinend nicht unterstützt (Stand März 2017). Daher werden wir hier die Google APIs-Clientbibliothek verwenden. Installieren Sie vor der Verwendung die API-Clientbibliothek von Dataflow (v1b3 scheint die neueste Version ab dem 23. März zu sein). Wenn Sie den in der obigen Ausführung erstellten GCS-Pfad in gcsPath des Hauptteils des Anforderungsparameters angeben und ausführen, wird der Job aus der Vorlage generiert und ausgeführt. Unten finden Sie Beispielcode für Go und Python, aber Sie sollten ihn auch in anderen Sprachen aus der Bibliothek ausführen können. (Ich habe die Python-Version lokal ausprobiert, aber noch nicht in AppEngine. Bitte lassen Sie mich wissen, wenn ein Problem vorliegt.)
Go
import (
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/appengine"
dataflow "google.golang.org/api/dataflow/v1b3"
"google.golang.org/appengine/urlfetch"
)
//Unterlassung
func handler(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
client := &http.Client{
Transport: &oauth2.Transport{
Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
Base: &urlfetch.Transport{Context: c},
},
}
service, err := dataflow.New(client)
templates := dataflow.NewProjectsTemplatesService(service)
req := &dataflow.CreateJobFromTemplateRequest{
GcsPath: "gs://{your bucket}/mytemplate",
JobName: "sklearn",
Parameters: map[string]string{
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
}
job, err := templates.Create("{your project}", req).Do()
//Unterlassung
}
Python
from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()
body = {
"environment": {
"bypassTempDirValidation": False,
"tempLocation": "gs://{your bucket}/temp",
#"serviceAccountEmail": "A String",
#"zone": "us-central1-f",
"maxWorkers": 1,
},
"gcsPath": "gs://{your bucket}/mytemplate",
"parameters": {
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
"jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()
Es scheint, dass die einzigen erforderlichen Elemente im Body gcsPath und jobName sind. Es scheint, dass jobName eine Zeichenfolge enthalten sollte, die für den ausgeführten Job eindeutig ist. Parameter gibt den Laufzeitparameter an, den Sie zur Laufzeit an die Pipeline übergeben möchten. Die Antwort enthält die Job-ID. Behalten Sie diese bei, wenn Sie den Job später abbrechen möchten.
Die registrierte Pipeline kann übrigens auch über die Konsole ausgeführt werden. Sie können einen Job auch ausführen, indem Sie auf dem Bildschirm darunter eine benutzerdefinierte Vorlage auswählen, die von [+ RUN JOB] oben im Bildschirm der Datenflusskonsole übergeht, und den GCS-Pfad der registrierten Vorlage angeben.
Wenn Sie einen Job in der Pipeline starten, aber ein Problem finden oder ihn nur für eine bestimmte Zeit im Streaming-Modus starten möchten, müssen Sie den Job stoppen und den Job über die Vorlage starten. Geben Sie beim Stoppen den Status "JOB_STATE_CANCELLED" in derselben Dataflow-REST-API an und aktualisieren Sie den Job. Unten finden Sie ein Python-Codebeispiel.
Python
jobs= service.projects().jobs()
body = {
"requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()
Dadurch wird der Job abgebrochen und der gestartete Cluster gelöscht.
Sie können auch regelmäßig eine zuvor erstellte Datenanalyse-Pipeline ausführen, indem Sie cron usw. von AppEngine verwenden. Dies hat den Bereich der Datenflussnutzung nicht nur für die Vorverarbeitung in der Verifizierungsphase der Datenanalyse, sondern auch für die Datenerfassung und -verarbeitung in der Betriebsphase erweitert. Da Sie auf einfache Weise eine Pipeline erstellen können, indem Sie einen Vorverarbeitungsworkflow schreiben, der die Datenerfassung während des Betriebs auch in der Überprüfungsphase des Datenanalysesystems voraussetzt, werden die Daten, die zum Zeitpunkt der Überprüfung verwendet werden sollten, zum Zeitpunkt der Systementwicklung erfasst. Sie können es einfacher machen, Probleme wie Weinen und Wiederherstellen der Modellierung schnell zu erkennen, wenn Sie feststellen, dass die Kosten unerwartet hoch sind.
Ich denke, dass eines der Merkmale von GCP darin besteht, dass sich Anwendungsentwickler und Ingenieure für maschinelles Lernen auf die Entwicklung und Datenanalyse konzentrieren können, indem die Cloud-Seite den problematischen Aufbau und Betrieb der Infrastruktur übernimmt. Ich werde. Ich gehe davon aus, dass Dataflow die Rolle des Aufbaus und des Betriebs einer Datenverarbeitungspipeline übernehmen wird, was bei der Datenanalyse ebenso problematisch ist wie bei AppEngine bei der Entwicklung von Webanwendungen.
for Java
Führen Sie Cloud Dataflow für Java von App Engine for Go mit Laufzeitparametern aus
Recommended Posts