Dieser Artikel ist der Artikel zum 23. Tag von Classi Adventskalender 2019.
Hallo, dies ist @tomoyanamekawa von Data AI Teil von Classi. Normalerweise baue ich eine Datenanalyseplattform auf GCP auf.
Vor kurzem gab es einen Fall, in dem ich die Daten in BigQuery gemäß den darin enthaltenen Werten in Dateien aufteilen und in GCS speichern wollte. Zu diesem Zeitpunkt wurde ich von Cloud Dataflow betreut. Es scheint Nachfrage von anderen Leuten zu geben, und es gab nur wenige Implementierungsbeispiele in Python, also werde ich es zusammenfassen.
Führen Sie täglich den Vorgang des Exportierens einer bestimmten Tabelle in BigQuery in Google Cloud Storage (GCS) aus. Ich möchte jedoch das Speicherzielverzeichnis abhängig vom Wert einer bestimmten Spalte ändern. Das Dateiformat ist json.
Reservierungstabelle in BigQuery Ich möchte für jedes Datum / jede Shop-ID wie folgt separat in GCS speichern.
Es handelt sich um einen von GCP bereitgestellten Dienst, der die ETL-Verarbeitung ohne Server durchführen kann. Hinter den Kulissen wird Apache Beam ausgeführt. Man kann also sagen, dass es sich um einen Dienst handelt, der Apache Beam ohne Server verwenden kann. Da eine Parallelverarbeitung durchgeführt werden kann, können auch große Datenmengen mit hoher Geschwindigkeit verarbeitet werden.
Es unterstützt sowohl die Stream-Verarbeitung als auch die Stapelverarbeitung, diesmal verwenden wir jedoch die Stapelverarbeitung. Weitere Informationen finden Sie auf der offiziellen Seite.
Für diejenigen, die es vorerst verwenden möchten, finde ich, dass dieses Verfahren im Präsentationsmaterial von Herrn Yuzutaso gut ist (ich verstehe es auch damit). Es wurde hochgeladen).
Cloud Dataflow verwendet eine sogenannte "Vorlage", um einen ETL-Prozess zu erstellen. Verwenden Sie für die allgemeine Verarbeitung Von Google bereitgestellte Vorlagen, um dies auf GUI-Basis zu vereinfachen. Ich kann es schaffen Diesmal kann ich jedoch nicht das tun, was ich möchte, daher erstelle ich selbst eine benutzerdefinierte Vorlage.
Übrigens kann Java oder Python als Programmiersprache verwendet werden. Dieses Mal werde ich in Python schreiben, aber Java verfügt über mehr Funktionen und Dokumentation. Wenn Sie oder Ihre Teammitglieder Java schreiben können und keine Wartungsprobleme vorliegen, ist Java meiner Meinung nach besser.
Hier ist der Inhalt der benutzerdefinierten Vorlage.
test_template.py
import os
import json
import datetime
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
class JsonSink(fileio.TextSink):
def write(self, record):
self._fh.write(json.dumps(record).encode('utf8'))
self._fh.write('\n'.encode('utf8'))
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y%m%d')
project_id = 'your_project'
dataset_name = 'your_dataset'
table_name = 'your_table'
bucket_name = 'your_bucket'
#Möglichkeit
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
#Erstellen einer Pipeline
pipeline = beam.Pipeline(options=pipeline_options)
(pipeline
| 'read' >> beam.io.Read(beam.io.BigQuerySource(
project=project_id,
use_standard_sql=True,
query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
))
| 'write' >> beam.io.fileio.WriteToFiles(
path=f'gs://{bucket_name}/{now}',
destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
sink=JsonSink(),
file_naming=beam.io.fileio.destination_prefix_naming()
)
)
pipeline.run()
Der Punkt ist, dass wir diese Funktion Dynamische Ziele verwenden. Da der Wert für jeden Datensatz in der Variablen "Datensatz" gespeichert ist, können Sie das Ziel (Zieldateiname) für jeden Datensatz mit "Datensatz ['shop_id']" ändern.
Führen Sie diesen Befehl aus, da die erstellte Vorlage auf GCS platziert werden muss.
python -m test_template
Anschließend wird die Vorlage an dem durch "google_cloud_options.template_location" angegebenen Speicherort abgelegt. Sie können den Speicherort der Vorlage auch zur Laufzeit festlegen.
Cloud Dataflow selbst verfügt nicht über eine Scheduler-Funktion, daher muss es extern ausgeführt werden, um täglich ausgeführt zu werden. Daher werden wir dieses Mal die Ausführung ohne Server mit Cloud Scheduler + Cloud Pub / Sub + Cloud-Funktionen aktivieren.
Registrieren Sie das folgende Skript in Cloud-Funktionen. Dieses Skript führt die benutzerdefinierte Vorlage für Sie aus.
from googleapiclient.discovery import build
def main(data, context):
job = 'my_job'
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
projectId='your_project',
gcsPath='gs://your_bucket/templates/test_template'
)
response = request.execute()
Auslöser für Cloud-Funktionen sind Pub / Sub. Wenn Pub / Sub als Trigger verwendet wird, müssen zwei Argumente empfangen werden, daher wird es als "main (data, context)" festgelegt.
Sie müssen lediglich ein Pub- / Unterthema erstellen, das den Auslöser darstellt, und dieses Thema täglich im Cloud Scheduler veröffentlichen.
Wenn Sie Cloud Composer oder einen Server einrichten und mit anderen Workflow-Engines oder Cron planen, können Sie eine benutzerdefinierte Vorlage über den folgenden Befehl gcloud ausführen.
gcloud dataflow jobs run my_job \
--gcs-location gs://your_bucket/templates/test_template \
--region=asia-northeast1
Der Cloud-Datenfluss ist sehr praktisch, da es erschreckend wäre, ein System zu implementieren, das eine solche Verarbeitung in kurzer Zeit in großem Maßstab durchführen kann. Es ist ein wenig teuer, daher denke ich, dass es notwendig ist, die Nutzung so zu wählen, dass sie mit Cloud Dataflow nicht xx Millionen Yen kostet.
Morgen ist @ tetsuya0617. freue mich auf!
Recommended Posts