[GO] Führen Sie Cloud Dataflow (Python) über AppEngine aus

(Aktualisiert nach 5/31 v2.0.0 Release)

Die Cloud-Datenfluss-Python-Version ist endlich GA, also habe ich versucht zu prüfen, ob die Vorlagenausführung mit der Python-Version durchgeführt werden kann, aber ich habe versucht, die Pipeline im Voraus zu registrieren ~~ (obwohl Parameter ab 3/23 nicht übergeben werden können) ~~ (5/31) (Es ist jetzt möglich, Parameter in der Version 2.0.0 zu übergeben.) Ich konnte von AppEngine booten und möchte die Prozedur freigeben.

Was ist die Ausführung von Cloud-Datenflussvorlagen?

Mit dieser Funktion können Sie die Datenfluss-Pipeline im Voraus in GCS registrieren und die registrierte Pipeline ausführen, indem Sie jederzeit beliebige Parameter übergeben. Durch Aufrufen der Vorlagenausführung über AppEngine können Sie die Datenverarbeitung und Analyseverarbeitung einfach über das Programm ausführen, ohne einen Server zum Starten der Pipeline selbst einrichten zu müssen. Sie können cron auch verwenden, um die Datenanalyse-Pipeline regelmäßig auszuführen.

In der Phase des Versuchs und Irrtums wird es möglicherweise nicht häufig verwendet, um die Genauigkeit durch maschinelles Lernen zu verbessern. Wenn es jedoch in den tatsächlichen Betrieb geht, ist es möglich, eine komplizierte Datenverarbeitungspipeline auszuführen, ohne sich um den Serverbetrieb sorgen zu müssen. Ich denke, dass es für den Bediener viel einfacher sein wird. Darüber hinaus scheint die Entwicklung einfacher zu sein, wenn die während des Versuchs und Irrtums verwendete Pipeline in einer ähnlichen Form in Betrieb genommen werden kann. (Neben der Schwierigkeit, ein hochpräzises Modell zu erstellen, sollte es ziemlich schwierig sein, das erstellte maschinelle Lernmodell als ein System zu gestalten, das stabil arbeitet.)

Ausführungsmethode

Die Ausführung der Datenflussvorlage umfasst die folgenden Schritte:

Im Folgenden möchte ich die Vorgehensweise für jeden Schritt erläutern.

Problem bei der Verwendung von Parametern in der Pipeline behoben

Definiert eine benutzerdefinierte Optionsklasse zum Empfangen von Parametern, die von außen übergeben werden. In Beam bezieht sich das Programm auf Parameter, die zur Laufzeit von außen über die ValueProvider-Klasse übergeben werden. Die PipelineOptions-Klasse verfügt über einen eigenen Parser mit der Methode add_value_provider_argument zum Lesen von Parametern als ValueProvider. Erstellen Sie eine benutzerdefinierte Optionsklasse, die die PipelineOptions-Klasse erbt, und beschreiben Sie die Einstellungen der Parameter, die Sie dem Parser in der bei der Initialisierung aufgerufenen Methode _add_argparse_args hinzufügen möchten. Im folgenden Beispiel werden Eingabe, Ausgabe und Datum als benutzerdefinierte Parameter angegeben.

pipeline.py


import apache_beam as beam

class MyOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
            '--input',
            default="gs://{mybucket}/{pathtofile}",
            help='input gcs file path')
        
        parser.add_value_provider_argument(
            '--output',
            default="gs://{mybucket}/{pathtofile}",
            help='output gcs file path')

        parser.add_value_provider_argument(
            '--date',
            default="20170531",
            help='today')

options = MyOptions()

Ändern Sie diese Option, um den Wert von ValueProvider bei der Verarbeitung der Pipeline zu verwenden. ValueProvider kann als Variable der zuvor erstellten benutzerdefinierten Option referenziert werden. Das Programm muss den Wert verzögert über ValueProvider abrufen und den Wert mit .get () abrufen. Wenn Sie ValueProvider in der internen Verarbeitung von PTransform oder DoFn verwenden möchten, übergeben Sie ValueProvider im Konstruktor, behalten Sie es als Instanzvariable bei und verwenden Sie intern .get (), um darauf zu verweisen. Beachten Sie, dass die von Beam bereitgestellten Klassen ReadFromText und WriteToText ValueProvider direkt als Argument übergeben können. Im folgenden Beispiel wird die durch die externe Parametereingabe angegebene Zieldatei gelesen, jede Zeile durch die durch das Datum angegebene Zeichenfolge ersetzt und die Datei in den durch die Ausgabe angegebenen Pfad geschrieben.

pipeline.py


class MyDoFn(beam.DoFn):

    #Empfangen Sie ValueProvider im Konstruktor und setzen Sie die Instanzvariable
    def __init__(self, date):
        self._date = date
    
    def process(self, element):
        yield self._date.get() #Wert ist.get()Get by Methode


p = beam.Pipeline(options=options)

(p | "Read"  >> beam.io.ReadFromText(options.input)
   | "DoFn"  >> beam.ParDo(MyDoFn(options.date))  #Übergeben Sie ValueProvider an den DoFn-Konstruktor
   | "Write" >> beam.io.WriteToText(options.output))

p.run()

Registrieren Sie die Pipeline bei GCS

Wenn Sie den GCS-Pfad, für den die Vorlage registriert ist, zu Google Cloud Options hinzufügen und ausführen, wird stattdessen die Pipeline ausgeführt, aber die Vorlagendatei, die den Verarbeitungsinhalt der Pipeline beschreibt, wird im angegebenen GCS-Pfad registriert. Die Ausführungsumgebung sollte überall dort in Ordnung sein, wo Dataflow Runner die Pipeline ausführen kann.

pipeline.py


options = MyOptions()

#Geben Sie DataflowRunner für Runner an
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# template_Geben Sie den GCS-Pfad an, um die Vorlage am Speicherort zu registrieren
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'

#Führen Sie die Pipeline aus
p = beam.Pipeline(options=options)

~Pipeline-Verarbeitungscode~

p.run().wait_until_finish()

Bei der obigen Ausführung wird im angegebenen GCS-Pfad eine Vorlagendatei generiert, die den Verarbeitungsinhalt der Pipeline beschreibt.

Führen Sie die registrierte Pipeline aus

Die Ausführung der registrierten Vorlage sendet Anweisungen an Google REST API. Die Google Cloud-Clientbibliothek wird anscheinend nicht unterstützt (Stand März 2017). Daher werden wir hier die Google APIs-Clientbibliothek verwenden. Installieren Sie vor der Verwendung die API-Clientbibliothek von Dataflow (v1b3 scheint die neueste Version ab dem 23. März zu sein). Wenn Sie den in der obigen Ausführung erstellten GCS-Pfad in gcsPath des Hauptteils des Anforderungsparameters angeben und ausführen, wird der Job aus der Vorlage generiert und ausgeführt. Unten finden Sie Beispielcode für Go und Python, aber Sie sollten ihn auch in anderen Sprachen aus der Bibliothek ausführen können. (Ich habe die Python-Version lokal ausprobiert, aber noch nicht in AppEngine. Bitte lassen Sie mich wissen, wenn ein Problem vorliegt.)

Go


import (
  "net/http"
  "golang.org/x/net/context"
  "golang.org/x/oauth2"
  "golang.org/x/oauth2/google"
  "google.golang.org/appengine"
  dataflow "google.golang.org/api/dataflow/v1b3"
  "google.golang.org/appengine/urlfetch"
)
//Unterlassung
func handler(w http.ResponseWriter, r *http.Request) {
  c := appengine.NewContext(r)
  client := &http.Client{
    Transport: &oauth2.Transport{
      Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
      Base:   &urlfetch.Transport{Context: c},
    },
  }

  service, err := dataflow.New(client)
  templates := dataflow.NewProjectsTemplatesService(service)
  req := &dataflow.CreateJobFromTemplateRequest{
    GcsPath: "gs://{your bucket}/mytemplate",
    JobName: "sklearn",
    Parameters: map[string]string{
      "input": "gs://{yourbucket}/{pathtofile1}",
      "output": "gs://{yourbucket}/{pathtofile2}",
      "date": "20170601",
    },
  }
  job, err := templates.Create("{your project}", req).Do()
  //Unterlassung
}

Python


from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build

credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()

body = {
    "environment": {
        "bypassTempDirValidation": False,
        "tempLocation": "gs://{your bucket}/temp",
        #"serviceAccountEmail": "A String",
        #"zone": "us-central1-f",
        "maxWorkers": 1,
    },
    "gcsPath": "gs://{your bucket}/mytemplate",
    "parameters": {
      "input": "gs://{yourbucket}/{pathtofile1}",
      "output": "gs://{yourbucket}/{pathtofile2}",
      "date": "20170601",
    },
    "jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()

Es scheint, dass die einzigen erforderlichen Elemente im Body gcsPath und jobName sind. Es scheint, dass jobName eine Zeichenfolge enthalten sollte, die für den ausgeführten Job eindeutig ist. Parameter gibt den Laufzeitparameter an, den Sie zur Laufzeit an die Pipeline übergeben möchten. Die Antwort enthält die Job-ID. Behalten Sie diese bei, wenn Sie den Job später abbrechen möchten.

Die registrierte Pipeline kann übrigens auch über die Konsole ausgeführt werden. Sie können einen Job auch ausführen, indem Sie auf dem Bildschirm darunter eine benutzerdefinierte Vorlage auswählen, die von [+ RUN JOB] oben im Bildschirm der Datenflusskonsole übergeht, und den GCS-Pfad der registrierten Vorlage angeben. dataflowjob.png

Stoppen Sie die laufende Pipeline

Wenn Sie einen Job in der Pipeline starten, aber ein Problem finden oder ihn nur für eine bestimmte Zeit im Streaming-Modus starten möchten, müssen Sie den Job stoppen und den Job über die Vorlage starten. Geben Sie beim Stoppen den Status "JOB_STATE_CANCELLED" in derselben Dataflow-REST-API an und aktualisieren Sie den Job. Unten finden Sie ein Python-Codebeispiel.

Python


jobs= service.projects().jobs()

body = {
    "requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()

Dadurch wird der Job abgebrochen und der gestartete Cluster gelöscht.

abschließend

Sie können auch regelmäßig eine zuvor erstellte Datenanalyse-Pipeline ausführen, indem Sie cron usw. von AppEngine verwenden. Dies hat den Bereich der Datenflussnutzung nicht nur für die Vorverarbeitung in der Verifizierungsphase der Datenanalyse, sondern auch für die Datenerfassung und -verarbeitung in der Betriebsphase erweitert. Da Sie auf einfache Weise eine Pipeline erstellen können, indem Sie einen Vorverarbeitungsworkflow schreiben, der die Datenerfassung während des Betriebs auch in der Überprüfungsphase des Datenanalysesystems voraussetzt, werden die Daten, die zum Zeitpunkt der Überprüfung verwendet werden sollten, zum Zeitpunkt der Systementwicklung erfasst. Sie können es einfacher machen, Probleme wie Weinen und Wiederherstellen der Modellierung schnell zu erkennen, wenn Sie feststellen, dass die Kosten unerwartet hoch sind.

Ich denke, dass eines der Merkmale von GCP darin besteht, dass sich Anwendungsentwickler und Ingenieure für maschinelles Lernen auf die Entwicklung und Datenanalyse konzentrieren können, indem die Cloud-Seite den problematischen Aufbau und Betrieb der Infrastruktur übernimmt. Ich werde. Ich gehe davon aus, dass Dataflow die Rolle des Aufbaus und des Betriebs einer Datenverarbeitungspipeline übernehmen wird, was bei der Datenanalyse ebenso problematisch ist wie bei AppEngine bei der Entwicklung von Webanwendungen.

for Java

Führen Sie Cloud Dataflow für Java von App Engine for Go mit Laufzeitparametern aus

Recommended Posts

Führen Sie Cloud Dataflow (Python) über AppEngine aus
Führen Sie XGBoost mit Cloud Dataflow (Python) aus.
[Python] Führen Sie Flask in Google App Engine aus
Führen Sie Python aus Excel aus
Cloud Run Tutorial (Python)
Führen Sie eine Pipeline für maschinelles Lernen mit Cloud Dataflow (Python) aus.
Führen Sie das Illustrator-Skript von Python aus
So aktivieren Sie, dass Python3 Jobs ausführt, wenn Jobs von GCP Cloud Composer an Dataflow gesendet werden
Java 1 1-Unterstützung von Google App Engine
Verwenden von Cloud-Speicher aus Python3 (Einführung)
Führen Sie Aprili von Python auf Orange aus
Python-Fehlererkennung von Powershell ausgeführt
Führen Sie Python-Skripte synchron von C # aus
Führen Sie Ansible über Python mithilfe der API aus
Verwenden Sie den Cloud-Datenspeicher von Compute Engine
Führen Sie das Python-Skript in Cisco Memorandum_EEM aus
Verwenden Sie die Google Cloud Vision-API von Python
Verwenden von Djangos ImageField mit AppEngine / Python
Betreiben Sie den Cloud-Objektspeicher von Sakura von Python aus
Führen Sie Python-Skripte in C # -GUI-Anwendungen aus
Firebase: Verwenden Sie Cloud Firestore und Cloud Storage von Python
Greifen Sie über eine Compute Engine-Instanz auf den Cloud-Speicher zu
SQL zu SQL
Tweet (API 1.1) mit Google App Engine für Python
MeCab von Python
Führen Sie Python-Dateien mit Django aus HTML aus
Führen Sie Python-Skripte in Excel aus (mit xlwings).
Führen Sie die Kolben-App auf Cloud 9 und Apache Httpd aus
Führen Sie Python aus Excel VBA mit xlwings & Tutorial Supplement aus
App-Entwicklung zum Twittern in Python aus Visual Studio 2017
Stellen Sie die Django-Anwendung in Google App Engine (Python3) bereit.
Konstruktionsverfahren für die Google App Engine / Python-Entwicklungsumgebung (Ende 2014)
Schritte von der Installation von Python 3 bis zur Erstellung einer Django-App
PIL in Python unter Windows8 (für Google App Engine)
Erste Schritte mit Google App Engine für Python und PHP
SPA-Lernbeispiel (Angular2 + Bootstrap / App Engine / Python + Webapp2)
Verwendung von Django mit Google App Engine / Python
Laufzeitversion der Google App Engine / Python-Standardumgebung
Python + Selen + Safari - Führen Sie die iPhone Safari vom Mac mit dem Webdriver aus
Generieren Sie Word Cloud aus Testfalldaten mit Python3
Verwenden Sie thingspeak aus Python
Führen Sie Python mit VBA aus
Bedienen Sie Filemaker von Python aus
Verwenden Sie fließend Python
Führen Sie prepDE.py mit python3 aus
Greifen Sie über Python auf Bitcoind zu
Änderungen von Python 3.0 zu Python 3.5
Änderungen von Python 2 zu Python 3.0
Python aus oder importieren
Verwenden Sie MySQL aus Python
Installieren Sie Python von der Quelle
Führen Sie Befehle aus Python aus
Führen Sie Blender mit Python aus
Bedienen Sie Neutronen von Python!
Cloud Dataflow Super Primer
Verwenden Sie MySQL aus Python
Betreiben Sie LXC von Python aus
Manipuliere Riak aus Python
Jinja2 | Python-Vorlagen-Engine