[PYTHON] The golden combination of embulk and BigQuery shines even more with Digdag

Introduction

CYBIRD Engineer Advent Calendar 2016, this year is also @yuichi_komatsu in charge of the 16th day. I am a data analysis engineer. We are also looking for friends who can devote themselves together! If you are interested, here! !!

Yesterday was @sakamoto_koji's "Knowledge gained from server-side development of subscription". .. It's a practical and valuable tip because we have been struggling in the field! great! !!

Then it is the main subject.

This story

Last year I wrote "The story of embulk and BigQuery being too golden" (I accidentally deleted this when I wrote a comment ... I'm sorry) There is a new force ** Embulk and BigQuery combinations will be further enhanced by creating a workflow with Digdag! ** ** It is a story.

What is Digdag?

It's not a digging game. It is a workflow engine in OSS of Treasure Data of the world. Jenkins is used by many departments of our company, but unlike that, there is no GUI (under development?), A file called dig is created with a YAML-like description, and JOB is executed. Similar products include Luigi, AirFlow, etc., and Luigi was used temporarily within the department, but compared to that, it is very intuitive, does not hesitate, and feels flexible (individual). Intuition). You don't need Python power like luigi. .. Please refer to here for the documentation including the installation of Digdag.

How to use (mode)

・ Local mode ・ Server mode ・ Client mode However, at the moment we are running on one server because it meets the requirements in local mode. This time, I would like to introduce some of the usage within our analysis team.

So, suddenly, this is a setting example.

Setting example (parent dig file: main.dig)

timezone: Asia/Tokyo

schedule:
    daily>: 1:00:00

+main:
    _export:
        host: 'XXX.XXX.XXX.XXX'
        user: 'hoge'
        password: 'hoge_password'
        database: 'testdb'
        project_id: 'sample_project'
        dataset: 'hoge_dataset'

    +date:
        py>: date.SetDate.set_date

    +all_load:
        _parallel: true

        +load_log:
            !include : 'hoge/log.dig'
        +load_user:
            !include : 'hoge/user.dig'
        +load_master:
            !include : 'hoge/master.dig'

This is the dig file used when loading the DB (MySQL) log of a game of our company into BigQuery with embulk, and it is the parent dig file that defines the common part. JOB is scheduled by running the scheduler in the background with ./digdag scheduler & and setting schedule: as described above. Below the + main task,_export:first defines the variables to be used after that. Here, MySQL access information used in Embulk Input and BigQuery project_id, dataset, etc. used in Output are defined. py>: of + date gets the target date in Python. Since the date data stored in the DB differs depending on the game from unixtime and datetime, either can be specified. For reference, this Python script is also included.

__init__.py


# -*- coding: utf-8 -*-
import digdag
import time
from datetime import datetime,timedelta
from pytz import timezone

class SetDate(object):
  def set_date(self, target_date = ''):
    # target_If there is a date argument
    if target_date:
        #Initiation condition
        start_datetime = datetime.strptime(target_date, '%Y-%m-%d')
        #Exit conditions
        end_datetime = datetime.strptime(target_date, '%Y-%m-%d') + timedelta(days=1)
    # target_If there is no date argument
    else:
        #Current time
        utc_now = datetime.now(timezone('UTC'))
        jst_now = datetime.now(timezone('Asia/Tokyo'))
        #Corresponding day (1 day ago)
        target_date = (jst_now - timedelta(days=1)).strftime('%Y-%m-%d')
        #Initiation condition
        start_datetime = datetime.strptime((jst_now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d')
        #Exit conditions
        end_datetime = datetime.strptime(jst_now.strftime('%Y-%m-%d'), '%Y-%m-%d')

    #Convert to unixtime
    start_unixtime = int(time.mktime(start_datetime.timetuple()))
    end_unixtime = int(time.mktime(end_datetime.timetuple()))

    #str conversion
    start_datetime = str(start_datetime)
    end_datetime = str(end_datetime)

    #Set in environment variable
    digdag.env.store({"target_date": target_date, "start_unixtime": start_unixtime, "end_unixtime": end_unixtime, "start_datetime": start_datetime, "end_datetime": end_datetime})

By doing ʻimport digdag and digdag.env.store, you can use the set value as an environment variable. Here, the date data used in the embulk yml and Chatwork integration scripts are acquired. Place the script as init.pyunder the digdag execution directory. In the example, it is placed asdate / __ init__.py`.

Going back to the parent dig file, In + all_load, the following child tasks are executed in parallel by setting true in_parallel:. You can also load other dig files with ! include:. Here, log.dig, ʻuser.dig, and master.dig` are operated in parallel.

Below is a sample of log.dig.

Setting example (child dig file: log.dig)

+log:

    _export:
    #----------------#
    # Config by TYPE #
    #----------------#
        process: 'log'

    +sample1_log:
        _export:
            table: 'sample1_log'
        _error:
            _export:
                status: 'ERROR'
            py>: info.ChatWork.post
        embulk>: hoge/yml/log/sample1_log.yml

    +sample2_log:
        _export:
            table: 'sample2_log'
        _error:
            _export:
                status: 'ERROR'
            py>: info.ChatWork.post
        embulk>: hoge/yml/log/sample2_log.yml


#(Omitted)


    +post:
    # SUCCESS info to Chatwork
        _export:
            job: 'ALL'
            status: 'SUCCESS'
        py>: info.ChatWork.post

The table variable is set in_export:of + sample1_log and + sample2_log, and embulk is executed. The set variables are used in embulk's yml. Also, if an error occurs there, it is posted to ChatWork with py>: info.ChatWork.post so that it can be determined in which task the error occurred. The JOB itself will end if an error occurs. digdag manages the session, and if you run it in the same session, digdag run main.dig will skip to the error part as it is. If you want to ignore the session and start from the beginning, use digdag run main.dig -a. Please refer to Documentation for the specifications of this area. In the example, target_date can be set as an argument, so you can also specify digdag run main.dig -p target_date = 2016-12-10.

The embulk yml sample (input: MySQL, output: BigQuery) is as follows.

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: "${password}"
  database: ${database}
  table: ${table}
  select: "id,action,zip_url,zip_hash,zip_created_at,zip_size,summary_flg ,image_quality,created_at,updated_at"
  where: created_at >= ${start_unixtime} AND created_at < ${end_unixtime}
out:
  type: bigquery
  mode: append
  auth_method: json_key
  json_keyfile: /home/hoge/embulk/keyfile/json_keyfile.json
  path_prefix: /home/hoge/embulk/tmp/${dataset}/${table}
  source_format: CSV
  compression: GZIP
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: ${table}
  schema_file: /home/hoge/embulk/schema/${dataset}/${table}.json
  delete_from_local_when_job_end: true

Variables can be referenced by $ {variable name}. Here, since the columns are specified by SELECT, the yml file is referenced for each table, but if you want to select all columns, you can cover it with one template, so you can make it a simpler configuration. think. BigQuery datasets, table partitions, etc. can also be changed dynamically as needed.

Other

Although not used by our analysis team, since Ver 0.8.18, operators such as bq>, bq_load> and gcs_wait> can be used, so the range of choices when loading into BigQuery I think that has spread. Well, the operator seems to be able to make his own, so in that sense it can be said that he can do anything. ..

Summary

Digdag allows you to define parent-child relationships and dependencies simply and intuitively, and of course it is perfectly compatible with embulk, and you can perform simple and flexible workflow processing by dynamically acquiring and setting variables. !! If you compare it to Captain Tsubasa, Digdag is like Misugi-kun, who manages to cooperate with the surroundings.

Finally

Tomorrow's CYBIRD Engineer Advent Calendar 2016, Day 17 [@ cy-nana-obata](http://qiita.com/cy- This is nana-obata). It will show off the youthful and hopeful material unique to new graduates! ?? I'm looking forward to it! !! !!

In addition, the soccer training game "BFB Champions" provided by our company is currently tied up with "Captain Tsubasa", and Tsubasa-kun and Misaki-kun's You can play the original golden combination in addition to Eleven, so if you haven't played it yet, please try it! There is also Misugi-kun! !!

Recommended Posts

The golden combination of embulk and BigQuery shines even more with Digdag
relation of the Fibonacci number series and the Golden ratio
Visualize the range of interpolation and extrapolation with python
See the power of speeding up with NumPy and SciPy
Play with the password mechanism of GitHub Webhook and Python
Combination of recursion and generator
Combination of anyenv and direnv
[Required subject DI] Implement and understand the mechanism of DI with Go
To improve the reusability and maintainability of workflows created with Luigi