[PYTHON] Die goldene Kombination aus Embulk und BigQuery glänzt mit Digdag noch mehr

Einführung

CYBIRD Engineer Adventskalender 2016, dieses Jahr ist auch @yuichi_komatsu für den 16. Tag verantwortlich. Ich bin ein Datenanalyseingenieur. Wir suchen auch Freunde, die sich gemeinsam widmen können! Wenn Sie interessiert sind, hier! !!

Gestern war @sakamoto_kojis "Ergebnisse aus der serverseitigen Entwicklung von Abonnements". .. Es ist ein praktischer und wertvoller Tipp, weil wir auf dem Gebiet Probleme haben! groß! !!

Dann ist es das Hauptthema.

Diese Geschichte

Letztes Jahr schrieb ich "Die Geschichte von Embulk und BigQuery ist zu golden" (ich habe dies versehentlich gelöscht, als ich einen Kommentar schrieb ... Es tut mir leid) Es gibt eine neue Kraft ** Die Kombination aus Embulk und BigQuery wird durch die Erstellung eines Workflows mit Digdag weiter verbessert! ** ** ** Es ist eine Geschichte.

Was ist Digdag?

Es ist kein Grabspiel. Es ist eine Workflow-Engine in OSS von Treasure Data der Welt. Jenkins wird von vielen unserer Abteilungen verwendet, aber im Gegensatz dazu gibt es keine GUI (in Entwicklung?). Eine Datei namens dig wird mit einer YAML-ähnlichen Beschreibung erstellt und JOB wird ausgeführt. Es gibt ähnliche Produkte wie Luigi und AirFlow, und Luigi wurde vorübergehend in der Abteilung verwendet, aber im Vergleich dazu ist es sehr intuitiv, zögert nicht und fühlt sich flexibel (individuell) an. Obwohl es ist). Sie brauchen keine Python-Power wie Luigi. .. Die Dokumentation einschließlich der Installation von Digdag finden Sie unter hier.

Verwendung (Modus)

· Lokalbetrieb ・ Servermodus ・ Client-Modus Im Moment laufen wir jedoch auf einem Server, da dieser die Anforderungen im lokalen Modus erfüllt. Dieses Mal möchte ich einen Teil der Verwendung in unserem Analyseteam vorstellen.

Dies ist also plötzlich ein Beispiel für die Einstellung.

Einstellungsbeispiel (übergeordnete Dig-Datei: main.dig)

timezone: Asia/Tokyo

schedule:
    daily>: 1:00:00

+main:
    _export:
        host: 'XXX.XXX.XXX.XXX'
        user: 'hoge'
        password: 'hoge_password'
        database: 'testdb'
        project_id: 'sample_project'
        dataset: 'hoge_dataset'

    +date:
        py>: date.SetDate.set_date

    +all_load:
        _parallel: true

        +load_log:
            !include : 'hoge/log.dig'
        +load_user:
            !include : 'hoge/user.dig'
        +load_master:
            !include : 'hoge/master.dig'

Dies ist die Dig-Datei, die beim Laden des DB (MySQL) -Protokolls eines unserer Spiele in BigQuery mit Embulk verwendet wird, und es ist die übergeordnete Dig-Datei, die die gemeinsamen Teile definiert. JOB wird geplant, indem der Scheduler im Hintergrund mit ./digdag scheduler & im Voraus ausgeführt und Schedule: wie oben beschrieben eingestellt wird. Unterhalb der Aufgabe "+ main" definiert "_export:" zunächst die danach verwendeten Variablen. Hier werden die Zugriffsinformationen von MySQL definiert, die für die Eingabe von embulk verwendet werden, sowie die project_id, der Datensatz usw. von BigQuery, die für die Ausgabe verwendet werden. Das py>: von + date erhält das Zieldatum in Python. Da sich die in der Datenbank gespeicherten Datumsdaten je nach Spiel von Unixtime und Datetime unterscheiden, kann beides angegeben werden. Als Referenz ist dieses Python-Skript ebenfalls enthalten.

__init__.py


# -*- coding: utf-8 -*-
import digdag
import time
from datetime import datetime,timedelta
from pytz import timezone

class SetDate(object):
  def set_date(self, target_date = ''):
    # target_Wenn es ein Argument für das Datum gibt
    if target_date:
        #Einleitungsbedingung
        start_datetime = datetime.strptime(target_date, '%Y-%m-%d')
        #Ausgangsbedingungen
        end_datetime = datetime.strptime(target_date, '%Y-%m-%d') + timedelta(days=1)
    # target_Wenn es kein Datumsargument gibt
    else:
        #Aktuelle Uhrzeit
        utc_now = datetime.now(timezone('UTC'))
        jst_now = datetime.now(timezone('Asia/Tokyo'))
        #Entsprechender Tag (vor 1 Tag)
        target_date = (jst_now - timedelta(days=1)).strftime('%Y-%m-%d')
        #Einleitungsbedingung
        start_datetime = datetime.strptime((jst_now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d')
        #Ausgangsbedingungen
        end_datetime = datetime.strptime(jst_now.strftime('%Y-%m-%d'), '%Y-%m-%d')

    #In Unixtime konvertieren
    start_unixtime = int(time.mktime(start_datetime.timetuple()))
    end_unixtime = int(time.mktime(end_datetime.timetuple()))

    #str Umwandlung
    start_datetime = str(start_datetime)
    end_datetime = str(end_datetime)

    #In Umgebungsvariable setzen
    digdag.env.store({"target_date": target_date, "start_unixtime": start_unixtime, "end_unixtime": end_unixtime, "start_datetime": start_datetime, "end_datetime": end_datetime})

Mit "digdag importieren" und "digdag.env.store" können Sie den eingestellten Wert als Umgebungsvariable verwenden. Hier werden die Datumsdaten erfasst, die in den Verknüpfungsskripten für Embulk yml und Chatwork verwendet werden. Platzieren Sie das Skript als "__init __. Py" unter dem Ausführungsverzeichnis von digdag. Im Beispiel wird es als "Datum / __ Init __. Py" platziert.

Zurück zur übergeordneten Dig-Datei, In + all_load werden die folgenden untergeordneten Aufgaben parallel ausgeführt, indem in _parallel true gesetzt wird:. Sie können auch andere Dig-Dateien mit ! Include: laden. Hier werden "log.dig", "user.dig" und "master.dig" parallel betrieben.

Unten finden Sie ein Beispiel für log.dig.

Einstellungsbeispiel (untergeordnete Dig-Datei: log.dig)

+log:

    _export:
    #----------------#
    # Config by TYPE #
    #----------------#
        process: 'log'

    +sample1_log:
        _export:
            table: 'sample1_log'
        _error:
            _export:
                status: 'ERROR'
            py>: info.ChatWork.post
        embulk>: hoge/yml/log/sample1_log.yml

    +sample2_log:
        _export:
            table: 'sample2_log'
        _error:
            _export:
                status: 'ERROR'
            py>: info.ChatWork.post
        embulk>: hoge/yml/log/sample2_log.yml


#(Weggelassen)


    +post:
    # SUCCESS info to Chatwork
        _export:
            job: 'ALL'
            status: 'SUCCESS'
        py>: info.ChatWork.post

Die Variable table wird in _export: of + sample1_log und + sample2_log gesetzt, und embulk wird ausgeführt. Die gesetzten Variablen werden in Embulks yml verwendet. Wenn dort ein Fehler auftritt, wird er mit py>: info.ChatWork.post an ChatWork gesendet, damit festgestellt werden kann, in welcher Aufgabe der Fehler aufgetreten ist. Der Job selbst wird beendet, wenn ein Fehler auftritt. digdag verwaltet die Sitzung, und wenn Sie sie in derselben Sitzung ausführen, springt digdag run main.dig zum Fehlerteil, wie er ist. Wenn Sie die Sitzung ignorieren und von vorne beginnen möchten, verwenden Sie "digdag run main.dig -a". Die technischen Daten dieses Bereichs finden Sie unter Dokumentation. Im Beispiel kann target_date als Argument festgelegt werden, sodass Sie auch digdag run main.dig -p target_date = 2016-12-10 angeben können.

Das Embulk-Yml-Beispiel (Eingabe: MySQL, Ausgabe: BigQuery) lautet wie folgt.

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: "${password}"
  database: ${database}
  table: ${table}
  select: "id,action,zip_url,zip_hash,zip_created_at,zip_size,summary_flg ,image_quality,created_at,updated_at"
  where: created_at >= ${start_unixtime} AND created_at < ${end_unixtime}
out:
  type: bigquery
  mode: append
  auth_method: json_key
  json_keyfile: /home/hoge/embulk/keyfile/json_keyfile.json
  path_prefix: /home/hoge/embulk/tmp/${dataset}/${table}
  source_format: CSV
  compression: GZIP
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: ${table}
  schema_file: /home/hoge/embulk/schema/${dataset}/${table}.json
  delete_from_local_when_job_end: true

Variablen können mit $ {Variablenname} referenziert werden. Da die Spalten von SELECT angegeben werden, wird hier auf die yml-Datei für jede Tabelle verwiesen. Wenn Sie jedoch alle Spalten auswählen möchten, können Sie sie mit einer Vorlage abdecken, um die Konfiguration zu vereinfachen. Überlegen. BigQuery-Datasets, Tabellenpartitionen usw. können bei Bedarf auch dynamisch geändert werden.

Andere

Obwohl es von unserem Analyseteam nicht verwendet wird, können seit Ver 0.8.18 oder höher Operatoren wie "bq>", "bq_load>" und "gcs_wait>" verwendet werden, sodass die Auswahlmöglichkeiten beim Laden in BigQuery groß sind Ich denke, das hat sich ausgebreitet. Nun, der Bediener scheint in der Lage zu sein, seine eigenen zu erstellen. In diesem Sinne kann man sagen, dass er alles tun kann. ..

Zusammenfassung

Mit Digdag können Sie Eltern-Kind-Beziehungen und Abhängigkeiten einfach und intuitiv definieren. Natürlich ist es perfekt mit Embulk kompatibel und Sie können eine einfache und flexible Workflow-Verarbeitung durchführen, indem Sie Variablen dynamisch erfassen und festlegen. !! Wenn Sie es mit Captain Tsubasa vergleichen, ist Digdag wie Misugi-kun, der es schafft, mit der Umgebung zusammenzuarbeiten.

Schließlich

Morgen CYBIRD Engineer Adventskalender 2016, Tag 17 [@ cy-nana-obata](http://qiita.com/cy- Dies ist Nana-Obata. Es wird das jugendliche und hoffnungsvolle Material zeigen, das es nur neuen Absolventen gibt! ?? Ich freue mich darauf! !! !!

Darüber hinaus ist das von unserer Firma bereitgestellte Fußballtrainingsspiel "BFB Champions" derzeit mit "Captain Tsubasa" sowie Tsubasa-kun und Misaki-kun verbunden Sie können die ursprüngliche goldene Kombination zusätzlich zu Elf spielen. Wenn Sie sie also noch nicht gespielt haben, probieren Sie sie bitte aus! Es gibt auch Misugi-Kun! !!

Recommended Posts

Die goldene Kombination aus Embulk und BigQuery glänzt mit Digdag noch mehr
Beziehung der Fibonacci-Zahlenreihe und des Goldenen Schnitts
Visualisieren Sie den Bereich der internen und externen Einfügungen mit Python
Sehen Sie, wie schnell Sie mit NumPy / SciPy beschleunigen können
Spielen Sie mit dem Passwortmechanismus von GitHub Webhook und Python
Kombination von rekursiv und Generator
Kombination von anyenv und direnv
[Erforderliches Thema DI] Implementieren und verstehen Sie den Mechanismus von DI mit Go
Verbesserung der Wiederverwendbarkeit und Wartbarkeit von mit Luigi erstellten Workflows