(Mis à jour après la version v2.0.0 du 31/05)
Étant donné que la version Cloud Dataflow Python est enfin GA, j'ai essayé de voir si l'exécution du modèle pouvait être effectuée avec la version Python, mais j'ai trouvé que le pipeline pré-enregistré ~~ (bien que les paramètres ne puissent pas être transmis à partir du 23/03) ~~ (31/05) (Il est maintenant possible de passer des paramètres dans la version 2.0.0) J'ai pu démarrer à partir d'AppEngine et je souhaite partager la procédure.
Il s'agit d'une fonction qui vous permet d'enregistrer à l'avance le pipeline Dataflow dans GCS et d'exécuter le pipeline enregistré en passant des paramètres à tout moment. En appelant l'exécution de modèle via AppEngine, vous pouvez facilement exécuter le traitement des données et le traitement d'analyse à partir du programme sans avoir à configurer vous-même un serveur pour démarrer le pipeline. Vous pouvez également utiliser cron pour exécuter régulièrement le pipeline d'analyse des données.
Il peut ne pas être beaucoup utilisé au stade des essais et des erreurs pour améliorer la précision par l'apprentissage automatique, mais lorsqu'il entre en fonctionnement réel, il est possible d'exécuter un pipeline de traitement de données compliqué sans se soucier du fonctionnement du serveur. Je pense que ce sera beaucoup plus facile pour l'opérateur. De plus, il semble que le développement sera plus facile si le pipeline utilisé pendant les essais et erreurs peut être mis en service réel sous une forme proche de celle-là. (Outre la difficulté de créer un modèle de haute précision, il devrait être assez difficile de façonner le modèle d'apprentissage automatique créé comme un système fonctionnant de manière stable)
L'exécution du modèle Dataflow prend les étapes suivantes:
Ci-dessous, je voudrais expliquer la procédure pour chaque étape.
Définit une classe d'options personnalisée pour recevoir les paramètres transmis de l'extérieur. Dans Beam, le programme fait référence aux paramètres transmis de l'extérieur lors de l'exécution via la classe ValueProvider. La classe PipelineOptions a son propre analyseur avec une méthode add_value_provider_argument pour lire les paramètres en tant que ValueProvider. Créez une classe d'options personnalisée qui hérite de la classe PipelineOptions et décrivez les paramètres des paramètres que vous souhaitez ajouter à l'analyseur dans la méthode _add_argparse_args appelée lors de l'initialisation. Dans l'exemple ci-dessous, l'entrée, la sortie et la date sont spécifiées en tant que paramètres personnalisés.
pipeline.py
import apache_beam as beam
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default="gs://{mybucket}/{pathtofile}",
help='input gcs file path')
parser.add_value_provider_argument(
'--output',
default="gs://{mybucket}/{pathtofile}",
help='output gcs file path')
parser.add_value_provider_argument(
'--date',
default="20170531",
help='today')
options = MyOptions()
Dans le traitement du pipeline, modifiez-le pour que la valeur de ValueProvider soit utilisée. ValueProvider peut être référencé en tant que variable pour l'option personnalisée créée précédemment. Le programme doit obtenir la valeur différemment via ValueProvider et obtenir la valeur avec .get (). Si vous souhaitez utiliser ValueProvider dans le traitement interne PTransform ou DoFn, passez ValueProvider dans le constructeur, conservez-le en tant que variable d'instance et utilisez .get () en interne pour y faire référence. Notez que les classes ReadFromText et WriteToText fournies par Beam peuvent directement passer ValueProvider comme argument. Dans l'exemple suivant, le fichier de destination spécifié par l'entrée de paramètre externe est lu, chaque ligne est remplacée par la chaîne de caractères spécifiée par date et le fichier est écrit dans le chemin spécifié par la sortie.
pipeline.py
class MyDoFn(beam.DoFn):
#Recevoir ValueProvider dans le constructeur et définir la variable d'instance
def __init__(self, date):
self._date = date
def process(self, element):
yield self._date.get() #La valeur est.get()Obtenir par méthode
p = beam.Pipeline(options=options)
(p | "Read" >> beam.io.ReadFromText(options.input)
| "DoFn" >> beam.ParDo(MyDoFn(options.date)) #Passer ValueProvider au constructeur DoFn
| "Write" >> beam.io.WriteToText(options.output))
p.run()
Si vous ajoutez le chemin GCS auquel le modèle est enregistré dans Google Cloud Options et que vous l'exécutez, le pipeline sera exécuté à la place, mais le fichier modèle qui décrit le contenu de traitement du pipeline sera enregistré dans le chemin GCS spécifié. L'environnement d'exécution devrait convenir partout où Dataflow Runner peut exécuter le pipeline.
pipeline.py
options = MyOptions()
#Spécifiez DataflowRunner pour le coureur
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# template_Spécifiez le chemin GCS pour enregistrer le modèle à l'emplacement
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'
#Exécutez le pipeline
p = beam.Pipeline(options=options)
~Code de traitement du pipeline~
p.run().wait_until_finish()
Lorsqu'il est exécuté ci-dessus, un fichier modèle qui décrit le contenu de traitement du pipeline est généré dans le chemin GCS spécifié.
L'exécution du modèle enregistré envoie des instructions à Google REST API. La bibliothèque cliente Google Cloud ne semble pas prise en charge (à partir de mars 2017), nous allons donc utiliser la bibliothèque cliente des API Google ici. Avant de l'utiliser, installez la bibliothèque cliente d'API de Dataflow (la v1b3 semble être la dernière version du 23/03). Si vous spécifiez le chemin GCS créé lors de l'exécution ci-dessus dans gcsPath du corps du paramètre de requête et que vous l'exécutez, le travail sera généré et exécuté à partir du modèle. Vous trouverez ci-dessous un exemple de code pour Go et Python, mais vous devriez également pouvoir l'exécuter à partir de Library dans d'autres langues. (J'ai essayé la version Python localement, mais je ne l'ai pas essayée sur AppEngine, alors faites-le moi savoir s'il y a un problème)
Go
import (
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/appengine"
dataflow "google.golang.org/api/dataflow/v1b3"
"google.golang.org/appengine/urlfetch"
)
//Omission
func handler(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
client := &http.Client{
Transport: &oauth2.Transport{
Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
Base: &urlfetch.Transport{Context: c},
},
}
service, err := dataflow.New(client)
templates := dataflow.NewProjectsTemplatesService(service)
req := &dataflow.CreateJobFromTemplateRequest{
GcsPath: "gs://{your bucket}/mytemplate",
JobName: "sklearn",
Parameters: map[string]string{
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
}
job, err := templates.Create("{your project}", req).Do()
//Omission
}
Python
from oauth2client.client import GoogleCredentials
from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build
credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()
body = {
"environment": {
"bypassTempDirValidation": False,
"tempLocation": "gs://{your bucket}/temp",
#"serviceAccountEmail": "A String",
#"zone": "us-central1-f",
"maxWorkers": 1,
},
"gcsPath": "gs://{your bucket}/mytemplate",
"parameters": {
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
"jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()
Il semble que les seuls éléments requis dans le corps sont gcsPath et jobName. Il semble que jobName devrait contenir une chaîne de caractères unique au travail en cours d'exécution. paramètre spécifie le paramètre d'exécution que vous souhaitez transmettre au pipeline au moment de l'exécution. La réponse contient l'ID du travail, donc conservez-le si vous souhaitez annuler le travail ultérieurement.
À propos, le pipeline enregistré peut également être exécuté à partir de la console. Vous pouvez également exécuter un travail en sélectionnant un modèle personnalisé sur l'écran ci-dessous qui passe de [+ EXÉCUTER JOB] en haut de l'écran de la console Dataflow, et en spécifiant le chemin GCS du modèle enregistré.
Si vous démarrez une tâche dans le pipeline mais que vous trouvez un problème, ou si vous souhaitez la démarrer uniquement pendant une durée spécifiée en mode de diffusion en continu, vous devez arrêter la tâche et démarrer la tâche à partir du modèle. Lors de l'arrêt, indiquez l'état "JOB_STATE_CANCELLED" dans la même API REST Dataflow et mettez à jour le travail. Voici un exemple de code Python.
Python
jobs= service.projects().jobs()
body = {
"requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()
Cela annulera le travail et supprimera le cluster démarré.
Vous pouvez également exécuter périodiquement un pipeline d'analyse de données créé à l'avance à l'aide de cron, etc. à partir d'AppEngine. Cela a élargi la gamme d'utilisation des flux de données non seulement pour le prétraitement dans la phase de vérification de l'analyse des données, mais également pour la collecte et le traitement des données dans la phase d'exploitation. Comme vous pouvez facilement créer un pipeline, en écrivant un flux de travail de prétraitement qui suppose la collecte de données pendant le fonctionnement, même dans la phase de vérification du système d'analyse de données, les données qui étaient censées être utilisées au moment de la vérification sont acquises au moment du développement du système. Vous pouvez faciliter l'identification rapide des problèmes tels que les pleurs et la refonte de la modélisation lorsque vous constatez que le coût est étonnamment élevé.
Je pense que l'une des fonctionnalités de GCP est que les développeurs d'applications et les ingénieurs en apprentissage automatique peuvent se concentrer sur le développement et l'analyse des données en laissant le côté cloud prendre en charge la construction et l'exploitation des infrastructures problématiques. Je vais. Je m'attends à ce que Dataflow assume le rôle de construction et d'exploitation d'un pipeline de traitement de données, ce qui a tendance à être gênant dans l'analyse des données, tout comme AppEngine l'était dans le développement d'applications Web.
for Java
Exécutez Cloud Dataflow pour Java à partir d'App Engine for Go avec les paramètres d'exécution
Recommended Posts