[PYTHON] La combinaison dorée d'Embulk et de BigQuery brille encore plus avec Digdag

introduction

CYBIRD Engineer Advent Calendar 2016, cette année est aussi @yuichi_komatsu en charge du 16e jour. Je suis ingénieur en analyse de données. Nous recherchons également des amis qui peuvent se consacrer ensemble! Si vous êtes intéressé, ici! !!

Hier, c'était @sakamoto_koji's "Findings gagned from server-side development of subscriptions". .. C'est un conseil pratique et précieux car nous avons eu du mal sur le terrain! génial! !!

Ensuite, c'est le sujet principal.

Cette histoire

L'année dernière, j'ai écrit "L'histoire d'embulk et BigQuery étant trop dorée" (j'ai accidentellement supprimé ceci lorsque j'ai écrit un commentaire ... je suis désolé) Il y a une nouvelle force ** La combinaison Embulk et BigQuery sera encore améliorée en créant un flux de travail avec Digdag! ** ** C'est une histoire.

Qu'est-ce que Digdag?

Ce n'est pas un jeu de fouille. C'est un moteur de workflow en OSS de Treasure Data du monde. Jenkins est utilisé par beaucoup de nos départements, mais contrairement à cela, il n'y a pas d'interface graphique (en cours de développement?), Et un fichier appelé dig est créé avec une description de type YAML et JOB est exécuté. Il existe des produits similaires tels que Luigi et AirFlow, et Luigi a été utilisé temporairement dans le département, mais comparé à cela, il est très intuitif, n'hésite pas et se sent flexible (individuel). Bien que ce soit le cas). Vous n'avez pas besoin de la puissance Python comme luigi. .. Veuillez vous référer à here pour la documentation comprenant l'installation de Digdag.

Comment utiliser (mode)

· Mode local ・ Mode serveur ・ Mode client Cependant, pour le moment, nous fonctionnons sur un serveur car il répond aux exigences en mode local. Cette fois, je voudrais vous présenter certaines des utilisations au sein de notre équipe d'analyse.

Donc, tout à coup, c'est un exemple de mise en scène.

Exemple de configuration (fichier dig parent: main.dig)

timezone: Asia/Tokyo

schedule:
    daily>: 1:00:00

+main:
    _export:
        host: 'XXX.XXX.XXX.XXX'
        user: 'hoge'
        password: 'hoge_password'
        database: 'testdb'
        project_id: 'sample_project'
        dataset: 'hoge_dataset'

    +date:
        py>: date.SetDate.set_date

    +all_load:
        _parallel: true

        +load_log:
            !include : 'hoge/log.dig'
        +load_user:
            !include : 'hoge/user.dig'
        +load_master:
            !include : 'hoge/master.dig'

Il s'agit du fichier dig utilisé lors du chargement du journal DB (MySQL) de l'un de nos jeux dans BigQuery avec embulk, et c'est le fichier dig parent qui définit les parties communes. JOB est planifié en exécutant le planificateur en arrière-plan avec ./digdag scheduler & à l'avance et en définissant schedule: comme décrit ci-dessus. Sous la tâche + main,_export:définit d'abord les variables utilisées par la suite. Ici, les informations d'accès de MySQL utilisées pour l'entrée d'embulk et le project_id, l'ensemble de données, etc. de BigQuery utilisé pour la sortie sont définis. Le py>: de + date obtient la date cible en Python. Étant donné que les données de date stockées dans la base de données diffèrent selon le jeu d'unixtime et datetime, l'une ou l'autre peut être spécifiée. Pour référence, ce script Python est également inclus.

__init__.py


# -*- coding: utf-8 -*-
import digdag
import time
from datetime import datetime,timedelta
from pytz import timezone

class SetDate(object):
  def set_date(self, target_date = ''):
    # target_S'il y a un argument pour la date
    if target_date:
        #Condition d'initiation
        start_datetime = datetime.strptime(target_date, '%Y-%m-%d')
        #Conditions de sortie
        end_datetime = datetime.strptime(target_date, '%Y-%m-%d') + timedelta(days=1)
    # target_S'il n'y a pas d'argument de date
    else:
        #Heure actuelle
        utc_now = datetime.now(timezone('UTC'))
        jst_now = datetime.now(timezone('Asia/Tokyo'))
        #Jour correspondant (il y a 1 jour)
        target_date = (jst_now - timedelta(days=1)).strftime('%Y-%m-%d')
        #Condition d'initiation
        start_datetime = datetime.strptime((jst_now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d')
        #Conditions de sortie
        end_datetime = datetime.strptime(jst_now.strftime('%Y-%m-%d'), '%Y-%m-%d')

    #Convertir en unixtime
    start_unixtime = int(time.mktime(start_datetime.timetuple()))
    end_unixtime = int(time.mktime(end_datetime.timetuple()))

    #conversion str
    start_datetime = str(start_datetime)
    end_datetime = str(end_datetime)

    #Définir dans la variable d'environnement
    digdag.env.store({"target_date": target_date, "start_unixtime": start_unixtime, "end_unixtime": end_unixtime, "start_datetime": start_datetime, "end_datetime": end_datetime})

En faisant ʻimport digdag et digdag.env.store`, vous pouvez utiliser la valeur définie comme variable d'environnement. Ici, les données de date utilisées dans les scripts de liaison embulk yml et Chatwork sont acquises. Placez le script comme «__init __. Py» sous le répertoire d'exécution de digdag. Dans l'exemple, il est placé sous la forme «date / __ init __. Py».

Revenant au fichier dig parent, Dans + all_load, les tâches enfants suivantes sont exécutées en parallèle en définissant true dans _parallel:. Vous pouvez également charger d'autres fichiers dig avec ! Include:. Ici, log.dig, ʻuser.dig et master.dig` fonctionnent en parallèle.

Voici un exemple de log.dig.

Exemple de configuration (fichier dig enfant: log.dig)

+log:

    _export:
    #----------------#
    # Config by TYPE #
    #----------------#
        process: 'log'

    +sample1_log:
        _export:
            table: 'sample1_log'
        _error:
            _export:
                status: 'ERROR'
            py>: info.ChatWork.post
        embulk>: hoge/yml/log/sample1_log.yml

    +sample2_log:
        _export:
            table: 'sample2_log'
        _error:
            _export:
                status: 'ERROR'
            py>: info.ChatWork.post
        embulk>: hoge/yml/log/sample2_log.yml


#(Omis)


    +post:
    # SUCCESS info to Chatwork
        _export:
            job: 'ALL'
            status: 'SUCCESS'
        py>: info.ChatWork.post

La variable table est définie dans _export: ʻof + sample1_log et + sample2_log, et embulk est exécuté. Les variables définies sont utilisées dans le yml d'embulk. De plus, si une erreur s'y produit, elle est publiée sur ChatWork avec py>: info.ChatWork.postafin de pouvoir déterminer dans quelle tâche l'erreur s'est produite. Le JOB lui-même se terminera également si une erreur se produit. digdag gère la session, et si vous l'exécutez dans la même session,digdag run main.digpassera à la partie erreur telle quelle. Si vous voulez ignorer la session et recommencer depuis le début, utilisezdigdag run main.dig -a. Veuillez consulter la [Documentation](http://docs.digdag.io/index.html) pour les spécifications de cette zone. Dans l'exemple, target_date peut être défini comme argument, vous pouvez donc également spécifier digdag run main.dig -p target_date = 2016-12-10`.

L'exemple Embulk yml (entrée: MySQL, sortie: BigQuery) est le suivant.

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: "${password}"
  database: ${database}
  table: ${table}
  select: "id,action,zip_url,zip_hash,zip_created_at,zip_size,summary_flg ,image_quality,created_at,updated_at"
  where: created_at >= ${start_unixtime} AND created_at < ${end_unixtime}
out:
  type: bigquery
  mode: append
  auth_method: json_key
  json_keyfile: /home/hoge/embulk/keyfile/json_keyfile.json
  path_prefix: /home/hoge/embulk/tmp/${dataset}/${table}
  source_format: CSV
  compression: GZIP
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: ${table}
  schema_file: /home/hoge/embulk/schema/${dataset}/${table}.json
  delete_from_local_when_job_end: true

Les variables peuvent être référencées par $ {nom de variable}. Ici, puisque les colonnes sont spécifiées par SELECT, le fichier yml est référencé pour chaque table, mais si vous souhaitez sélectionner toutes les colonnes, vous pouvez le couvrir avec un modèle, afin de pouvoir en faire une configuration plus simple. pense. Les ensembles de données BigQuery, les partitions de table, etc. peuvent également être modifiés de manière dynamique si nécessaire.

Autre

Bien qu'il ne soit pas utilisé par notre équipe d'analyse, depuis la version 0.8.18 ou ultérieure, des opérateurs tels que «bq>», «bq_load>» et «gcs_wait>» peuvent être utilisés, de sorte que la gamme de choix lors du chargement dans BigQuery Je pense que cela s'est répandu. Eh bien, l'opérateur semble être capable de faire le sien, donc en ce sens on peut dire qu'il peut tout faire. ..

Résumé

Digdag vous permet de définir les relations et les dépendances parent-enfant de manière simple et intuitive, et bien sûr, il est parfaitement compatible avec embulk, et vous pouvez effectuer un traitement de flux de travail simple et flexible en acquérant et en définissant dynamiquement des variables. !! Si vous le comparez avec le capitaine Tsubasa, Digdag est comme Misugi-kun, qui parvient à coopérer avec l'environnement.

finalement

Tomorrow's CYBIRD Engineer Advent Calendar 2016, Jour 17 [@ cy-nana-obata](http://qiita.com/cy- C'est nana-obata). Il montrera le matériel jeune et plein d'espoir unique aux nouveaux diplômés! ?? J'ai hâte d'y être! !! !!

De plus, le jeu d'entraînement de football "BFB Champions" fourni par notre société est actuellement lié à "Captain Tsubasa", et Tsubasa-kun et Misaki-kun's Vous pouvez jouer la combinaison d'or originale en plus d'Eleven, donc si vous ne l'avez pas encore jouée, essayez-la! Il y a aussi Misugi-kun! !!

Recommended Posts

La combinaison dorée d'Embulk et de BigQuery brille encore plus avec Digdag
relation entre la série de nombres de Fibonacci et le nombre d'or
Visualisez la gamme d'insertions internes et externes avec python
Découvrez la puissance de l'accélération avec NumPy / SciPy
Jouez avec le mécanisme de mot de passe de GitHub Webhook et Python
Combinaison de récursif et de générateur
Combinaison de anyenv et direnv
[Objet obligatoire DI] Implémenter et comprendre le mécanisme de DI avec Go
Pour améliorer la réutilisabilité et la maintenabilité des flux de travail créés avec Luigi