[PYTHON] Verwenden Sie Cloud Dataflow, um das Ziel dynamisch entsprechend dem Wert der Daten zu ändern und in GCS zu speichern

Einführung

Dieser Artikel ist der Artikel zum 23. Tag von Classi Adventskalender 2019.

Hallo, dies ist @tomoyanamekawa von Data AI Teil von Classi. Normalerweise baue ich eine Datenanalyseplattform auf GCP auf.

Vor kurzem gab es einen Fall, in dem ich die Daten in BigQuery gemäß den darin enthaltenen Werten in Dateien aufteilen und in GCS speichern wollte. Zu diesem Zeitpunkt wurde ich von Cloud Dataflow betreut. Es scheint Nachfrage von anderen Leuten zu geben, und es gab nur wenige Implementierungsbeispiele in Python, also werde ich es zusammenfassen.

Dieses Ziel

Führen Sie täglich den Vorgang des Exportierens einer bestimmten Tabelle in BigQuery in Google Cloud Storage (GCS) aus. Ich möchte jedoch das Speicherzielverzeichnis abhängig vom Wert einer bestimmten Spalte ändern. Das Dateiformat ist json.

Beispiel

Reservierungstabelle in BigQuery reservationsテーブル Ich möchte für jedes Datum / jede Shop-ID wie folgt separat in GCS speichern. reservations_GCS

Abschlusszeichnung

完成図

Umgebung

Was ist Cloud-Datenfluss?

Es handelt sich um einen von GCP bereitgestellten Dienst, der die ETL-Verarbeitung ohne Server durchführen kann. Hinter den Kulissen wird Apache Beam ausgeführt. Man kann also sagen, dass es sich um einen Dienst handelt, der Apache Beam ohne Server verwenden kann. Da eine Parallelverarbeitung durchgeführt werden kann, können auch große Datenmengen mit hoher Geschwindigkeit verarbeitet werden.

Es unterstützt sowohl die Stream-Verarbeitung als auch die Stapelverarbeitung, diesmal verwenden wir jedoch die Stapelverarbeitung. Weitere Informationen finden Sie auf der offiziellen Seite.

Für diejenigen, die es vorerst verwenden möchten, finde ich, dass dieses Verfahren im Präsentationsmaterial von Herrn Yuzutaso gut ist (ich verstehe es auch damit). Es wurde hochgeladen).

Erstellen Sie eine benutzerdefinierte Vorlage

Cloud Dataflow verwendet eine sogenannte "Vorlage", um einen ETL-Prozess zu erstellen. Verwenden Sie für die allgemeine Verarbeitung Von Google bereitgestellte Vorlagen, um dies auf GUI-Basis zu vereinfachen. Ich kann es schaffen Diesmal kann ich jedoch nicht das tun, was ich möchte, daher erstelle ich selbst eine benutzerdefinierte Vorlage.

Übrigens kann Java oder Python als Programmiersprache verwendet werden. Dieses Mal werde ich in Python schreiben, aber Java verfügt über mehr Funktionen und Dokumentation. Wenn Sie oder Ihre Teammitglieder Java schreiben können und keine Wartungsprobleme vorliegen, ist Java meiner Meinung nach besser.

Hier ist der Inhalt der benutzerdefinierten Vorlage.

test_template.py


import os
import json
import datetime

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions

        
class JsonSink(fileio.TextSink):
    def write(self, record):
      self._fh.write(json.dumps(record).encode('utf8'))
      self._fh.write('\n'.encode('utf8'))


if __name__ == '__main__':
    now = datetime.datetime.now().strftime('%Y%m%d')
    project_id = 'your_project'
    dataset_name = 'your_dataset'
    table_name = 'your_table'
    bucket_name = 'your_bucket'

    #Möglichkeit
    pipeline_options = PipelineOptions()
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = 'myjob'
    google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
    google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
    google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
        
    #Erstellen einer Pipeline
    pipeline = beam.Pipeline(options=pipeline_options)
    (pipeline 
        | 'read' >> beam.io.Read(beam.io.BigQuerySource(
            project=project_id, 
            use_standard_sql=True, 
            query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
        ))
        | 'write' >> beam.io.fileio.WriteToFiles(
            path=f'gs://{bucket_name}/{now}',
            destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
            sink=JsonSink(),
            file_naming=beam.io.fileio.destination_prefix_naming()
        )
    )

    pipeline.run()

Der Punkt ist, dass wir diese Funktion Dynamische Ziele verwenden. Da der Wert für jeden Datensatz in der Variablen "Datensatz" gespeichert ist, können Sie das Ziel (Zieldateiname) für jeden Datensatz mit "Datensatz ['shop_id']" ändern.

Führen Sie diesen Befehl aus, da die erstellte Vorlage auf GCS platziert werden muss.

python -m test_template

Anschließend wird die Vorlage an dem durch "google_cloud_options.template_location" angegebenen Speicherort abgelegt. Sie können den Speicherort der Vorlage auch zur Laufzeit festlegen.

Lass es täglich funktionieren

Cloud Dataflow selbst verfügt nicht über eine Scheduler-Funktion, daher muss es extern ausgeführt werden, um täglich ausgeführt zu werden. Daher werden wir dieses Mal die Ausführung ohne Server mit Cloud Scheduler + Cloud Pub / Sub + Cloud-Funktionen aktivieren. schedulerの構成

Registrieren Sie das folgende Skript in Cloud-Funktionen. Dieses Skript führt die benutzerdefinierte Vorlage für Sie aus.

from googleapiclient.discovery import build
def main(data, context):
    job = 'my_job'
    dataflow = build('dataflow', 'v1b3')
    request = dataflow.projects().templates().launch(
        projectId='your_project',
        gcsPath='gs://your_bucket/templates/test_template'
    )
    response = request.execute()

Auslöser für Cloud-Funktionen sind Pub / Sub. Wenn Pub / Sub als Trigger verwendet wird, müssen zwei Argumente empfangen werden, daher wird es als "main (data, context)" festgelegt.

Sie müssen lediglich ein Pub- / Unterthema erstellen, das den Auslöser darstellt, und dieses Thema täglich im Cloud Scheduler veröffentlichen.

Wenn Sie Cloud Composer oder einen Server einrichten und mit anderen Workflow-Engines oder Cron planen, können Sie eine benutzerdefinierte Vorlage über den folgenden Befehl gcloud ausführen.

gcloud dataflow jobs run my_job \
        --gcs-location gs://your_bucket/templates/test_template \
        --region=asia-northeast1

abschließend

Der Cloud-Datenfluss ist sehr praktisch, da es erschreckend wäre, ein System zu implementieren, das eine solche Verarbeitung in kurzer Zeit in großem Maßstab durchführen kann. Es ist ein wenig teuer, daher denke ich, dass es notwendig ist, die Nutzung so zu wählen, dass sie mit Cloud Dataflow nicht xx Millionen Yen kostet.

Morgen ist @ tetsuya0617. freue mich auf!

Recommended Posts

Verwenden Sie Cloud Dataflow, um das Ziel dynamisch entsprechend dem Wert der Daten zu ändern und in GCS zu speichern
Verarbeitung zur Verwendung von notMNIST-Daten in Python (und versucht, sie zu klassifizieren)
[Python] Die Rolle des Sterns vor der Variablen. Teilen Sie den Eingabewert und weisen Sie ihn einer Variablen zu
Sortieren Sie BigQuery-Tabellen nach Daten im Datenfluss
Verwenden Sie Pillow, um das Bild transparent zu machen und nur einen Teil davon zu überlagern
So speichern Sie eine Python-Funktion im Wert eines Wörterbuchs (dict) und rufen die Funktion gemäß dem Schlüssel auf
Verwenden wir die offenen Daten von "Mamebus" in Python
Wie man Decorator in Django benutzt und wie man es macht
[Django 2.2] Sortieren und erhalten Sie den Wert des Beziehungsziels
[C / C ++] Übergeben Sie den in C / C ++ berechneten Wert an eine Python-Funktion, um den Prozess auszuführen, und verwenden Sie diesen Wert in C / C ++.
Geben Sie die Bilddaten mit Flask of Python zurück und zeichnen Sie sie in das Canvas-Element von HTML
[Python] Ändern Sie die Cache-Steuerung von Objekten, die in den Cloud-Speicher hochgeladen wurden
Ändern Sie den Einstellungswert von settings.py entsprechend der Entwicklungsumgebung
Ändern Sie das Standardausgabeziel in eine Datei in Python
Vergleich der Verwendung von Funktionen höherer Ordnung in Python 2 und 3
Ändern Sie die Lautstärke von Pepper entsprechend der Umgebung (Ton).
Django Geändert, um viele Daten gleichzeitig zu speichern
Verschrotten Sie die Liste der Go To EAT-Mitgliedsgeschäfte in der Präfektur Fukuoka und konvertieren Sie sie in CSV
Verschrotten Sie die Liste der Go To EAT-Mitgliedsspeicher in der Präfektur Niigata und konvertieren Sie sie in CSV
So geben Sie die im Django-Modell enthaltenen Daten im JSON-Format zurück und ordnen sie der Broschüre zu
So ändern Sie die Farbe nur der mit Tkinter gedrückten Taste
Verschrotten Sie den Zeitplan von Hinatazaka 46 und spiegeln Sie ihn in Google Kalender wider
Fühlen Sie sich frei, das Legendenlabel mit Seaborn in Python zu ändern
Ich möchte sowohl den Schlüssel als auch den Wert des Python-Iterators verwenden
Ich habe zusammengefasst, wie die Boot-Parameter von GRUB und GRUB2 geändert werden
Konvertieren Sie das Ergebnis von Python Optparse, um es zu diktieren und zu verwenden
[Python / Jupyter] Übersetzen Sie den Kommentar des in die Zwischenablage kopierten Programms und fügen Sie ihn in eine neue Zelle ein.
Verwenden wir Python, um die Häufigkeit der in einem Datenrahmen enthaltenen Binärdaten in einem einzelnen Balkendiagramm darzustellen.
Lesen Sie die Daten des NFC-Lesegeräts, das mit Python an Raspberry Pi 3 angeschlossen ist, und senden Sie sie mit OSC an openFrameworks
Laden Sie Daten mit einem Befehl und einer Aktualisierung auf s3 von aws hoch und löschen Sie die verwendeten Daten (unterwegs).
Von der Einführung der GoogleCloudPlatform Natural Language API bis zur Verwendung
Diagramm der Geschichte der Anzahl der Ebenen des tiefen Lernens und der Änderung der Genauigkeit
Prognostizieren Sie den Stromverbrauch in 2 Tagen und veröffentlichen Sie ihn in CSV
Ändern Sie die Sättigung und Helligkeit von Farbspezifikationen wie # ff000 in Python 2.5
Klicken Sie auf die Rakuten-Ranking-API, um das Ranking einer beliebigen Kategorie in CSV zu speichern
Konvertieren Sie eine Tabelle in CSV und laden Sie sie mit Cloud-Funktionen in den Cloud-Speicher hoch
Suchen Sie den Namen und die Daten einer freien Variablen in einem Funktionsobjekt
Starten Sie die Webkamera, machen Sie ein Standbild und speichern Sie es lokal
In Dataflow implementiert, um die hierarchische Struktur von Google Drive in Google Cloud Storage zu kopieren
Ich habe versucht, den Höhenwert von DTM in einem Diagramm anzuzeigen
[Python] Senden Sie das von der Webkamera aufgenommene Bild an den Server und speichern Sie es
[Einführung in Data Scientist] Grundlagen der wissenschaftlichen Berechnung, Datenverarbeitung und Verwendung der Grafikzeichnungsbibliothek ♬ Umgebungskonstruktion
Der tree.plot_tree von scikit-learn war sehr einfach und bequem, daher habe ich versucht, zusammenzufassen, wie man es einfach benutzt.
(Tagebuch 1) Erstellen, Durchsuchen und Registrieren von Daten in der SQL-Datenbank des Microsoft Azure-Dienstes mit Python
Eine Geschichte über einen Ingenieur, der das Emo der Kryptographie bemerkt hat und versucht, es in Python zu implementieren
GAE - Drehen Sie mit Python das Bild basierend auf den Rotationsinformationen von EXIF und laden Sie es in den Cloud-Speicher hoch.