[PYTHON] Precautions when outputting to BigQuery table every hour with Luigi

Process-> Local-> Local to GCS-> Created a job called GCS to BigQuery

Luigi Reverse Lookup Reference Please refer to this post I created a job called Local-> GCS-> BigQuery for hourly processing results.

import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat

class LoadToGcs( luigi_bigquery.Query ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    def requires( self ):
        #Previous job. Outputting files locally in TSV format
        return Calculate( time = self.time)

    def output( self ):
        path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
        client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
        return GCSTarget( path, client = client )

    def run( self ):
        with self.input().open('r') as input:
            results = pd.read_csv( input,  sep='\t' )

        with self.output().open('w') as output:
            results.to_csv( output, index=False, encoding='utf8' )


class LoadToTable( BigqueryLoadTaskEx ):

    time               = luigi.DateHourParameter()

    source_format      = SourceFormat.CSV
    #I want you to append to the table every hour
    write_disposition  = WriteDisposition.WRITE_APPEND
    create_disposition = CreateDisposition.CREATE_IF_NEEDED
    max_bad_records    = 0
    skip_leading_rows  = 1

    def requires( self ):
        #Read files from GCS
        return LoadToGcs( time = self.time, )

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def source_uris(self):
        return [ self.input().path ]

    def output(self):
        
        return BigqueryTarget(
                project_id = 'test_project',
                dataset_id = 'test_dataset',
                #Image of table name: test_table_20161013
                table_id   = self.time.strftime( 'test_table' + '_%Y%m%d' ),
                client     = self.get_client()
        )

Luigi's BigqueryLoadTask made the code really simple and impressed.

Stumble point

Only midnight is uploaded to the table of the day

why

Because Target is BigQuery.

** Due to BigQuery specifications, it is recommended to end the table name with _% Y% m% d ** The reason is that it is summarized in a dropdown, so it is very easy to see in terms of UI when the number of tables is large. Reference: http://tech.vasily.jp/entry/bigquery_data_platform

Additional processing will be performed 24 times for one table per day. However, since the output is BigqueryTarget, when appending after 1 o'clock (when the table exists), the job is considered to have been executed and the job ends without loading.

write_disposition  = WriteDisposition.WRITE_APPEND

I thought that if I wrote this, it would be append, but it is absolutely necessary to go to see Target first due to Luigi's specifications. (Of course)

table_id   = self.time.strftime( 'test_table' + '_%Y%m%d%H' )

The quickest solution is to put it in the table name until the time, but the number of tables keeps increasing by 24 per day. I don't want to do it because it doesn't drop down and BigQuery gets pretty dirty.

How to load into BigQuery using BigqueryClient

Set Target to local or GCS to create an empty file


#Ordinary luigi.Use Task
class LoadToTable( luigi.Task ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def requires( self ):
        return LoadToGcs( time = self.time, )

    def run( self ):
        #Use BigqueryClient
        bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
        with open( self.schema_json, 'r' ) as f:
            schema = json.load( f )

        project_id = 'test_project'
        dataset_id = 'test_dataset'
        table_id = 'test_table'

        job = {
            'configuration': {
                'load': {
                    'sourceUris': [
                        self.input().path
                    ],
                    'schema': {
                        'fields': schema
                    },
                    'destinationTable': {
                        'projectId': project_id,
                        'datasetId': dataset_id,
                        'tableId': table_id
                    },
                    'sourceFormat': SourceFormat.CSV,
                    'writeDisposition': WriteDisposition.WRITE_APPEND,
                    #Since the first line of the original data is the column name, add it
                    'skipLeadingRows': 1,
                    'allowQuotedNewlines': 'true'
                }
            }
        }

        #Load into BigQuery
        bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))

        #Create empty file
        if not self.dry:
            self.output().open('w').close()

    def output( self ):
        output_path = os.path.join(
                '/tmp/work', #Working directory
                self.time.strftime( '%Y-%m-%d' ),        #date
                self.time.strftime( '%H' ),              #time
                str( self )                              #Task name
        )
        return luigi.LocalTarget( output_path )

Target just puts an empty file locally as proof of job execution. Don't rely on BigqueryLoadTask It's done.

Recommended Posts

Precautions when outputting to BigQuery table every hour with Luigi
Precautions when inputting from CSV with Python and outputting to json to make it an exe
A note I was addicted to when creating a table with SQLAlchemy
Precautions when installing tensorflow with anaconda
Precautions when using six with Python 2.5
[memo] Send table information to GCS with BigQuery Schedule and Cloud Functions
Precautions when solving DP problems with Python
How to work with BigQuery in Python
When changing the table name with flask_migrate
It is more convenient to use csv-table when writing a table with python-sphinx