[PYTHON] Vorsichtsmaßnahmen bei der stündlichen Ausgabe an die BigQuery-Tabelle mit Luigi

Prozess-> Lokal-> Lokal für GCS-> Erstellt einen Job namens GCS für BigQuery

Luigi Reverse Reference Bitte beziehen Sie sich auf diesen Beitrag Ich habe einen Job namens Local-> GCS-> BigQuery für die stündliche Verarbeitung von Ergebnissen erstellt.

import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat

class LoadToGcs( luigi_bigquery.Query ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    def requires( self ):
        #Vorherige Arbeit. Lokale Ausgabe von Dateien im TSV-Format
        return Calculate( time = self.time)

    def output( self ):
        path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
        client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
        return GCSTarget( path, client = client )

    def run( self ):
        with self.input().open('r') as input:
            results = pd.read_csv( input,  sep='\t' )

        with self.output().open('w') as output:
            results.to_csv( output, index=False, encoding='utf8' )


class LoadToTable( BigqueryLoadTaskEx ):

    time               = luigi.DateHourParameter()

    source_format      = SourceFormat.CSV
    #Ich möchte, dass Sie jede Stunde an den Tisch hängen
    write_disposition  = WriteDisposition.WRITE_APPEND
    create_disposition = CreateDisposition.CREATE_IF_NEEDED
    max_bad_records    = 0
    skip_leading_rows  = 1

    def requires( self ):
        #Lesen Sie Dateien aus GCS
        return LoadToGcs( time = self.time, )

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def source_uris(self):
        return [ self.input().path ]

    def output(self):
        
        return BigqueryTarget(
                project_id = 'test_project',
                dataset_id = 'test_dataset',
                #Bild des Tabellennamens: Test_table_20161013
                table_id   = self.time.strftime( 'test_table' + '_%Y%m%d' ),
                client     = self.get_client()
        )

Luigis "BigqueryLoadTask" machte den Code wirklich einfach und beeindruckend.

Stolperpunkt

Nur Mitternacht wird in die Tabelle des Tages hochgeladen

Warum

Weil das Ziel eine große Abfrage ist.

** Aufgrund der Spezifikationen von BigQuery wird empfohlen, am Ende "_% Y% m% d" hinzuzufügen, wenn Sie das Datum in den Tabellennamen eingeben. ** Der Grund dafür ist, dass es in einem Dropdown-Menü zusammengefasst ist, sodass es in Bezug auf die Benutzeroberfläche sehr leicht zu erkennen ist, wenn die Anzahl der Tabellen groß ist. Referenz: http://tech.vasily.jp/entry/bigquery_data_platform

Für eine Tabelle pro Tag wird eine zusätzliche Verarbeitung 24 Mal durchgeführt. Da die Ausgabe jedoch "BigqueryTarget" ist und als nach 1 Uhr angehängt wird (sofern die Tabelle vorhanden ist), gilt sie als ausgeführt und der Job wird ohne Laden beendet.

write_disposition  = WriteDisposition.WRITE_APPEND

Ich dachte, es wäre ein Anhang, wenn ich das schreibe, aber aufgrund von Luigis Spezifikationen ist es absolut notwendig, zuerst zu Target zu gehen. (Natürlich)

table_id   = self.time.strftime( 'test_table' + '_%Y%m%d%H' )

Die schnellste Lösung besteht darin, sie bis zu diesem Zeitpunkt in den Tabellennamen einzufügen, aber die Anzahl der Tabellen steigt ständig um 24 pro Tag. Ich möchte es nicht tun, weil es nicht herunterfällt und die BigQuery ziemlich schmutzig wird.

Laden mit BigqueryClient in BigQuery

Setzen Sie Target auf local oder GCS, um eine leere Datei zu erstellen


#Gewöhnliche Luigi.Aufgabe verwenden
class LoadToTable( luigi.Task ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def requires( self ):
        return LoadToGcs( time = self.time, )

    def run( self ):
        #Verwenden Sie BigqueryClient
        bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
        with open( self.schema_json, 'r' ) as f:
            schema = json.load( f )

        project_id = 'test_project'
        dataset_id = 'test_dataset'
        table_id = 'test_table'

        job = {
            'configuration': {
                'load': {
                    'sourceUris': [
                        self.input().path
                    ],
                    'schema': {
                        'fields': schema
                    },
                    'destinationTable': {
                        'projectId': project_id,
                        'datasetId': dataset_id,
                        'tableId': table_id
                    },
                    'sourceFormat': SourceFormat.CSV,
                    'writeDisposition': WriteDisposition.WRITE_APPEND,
                    #Da die erste Zeile der Originaldaten der Spaltenname ist, fügen Sie ihn hinzu
                    'skipLeadingRows': 1,
                    'allowQuotedNewlines': 'true'
                }
            }
        }

        #In BigQuery laden
        bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))

        #Leere Datei erstellen
        if not self.dry:
            self.output().open('w').close()

    def output( self ):
        output_path = os.path.join(
                '/tmp/work', #Arbeitsverzeichnis
                self.time.strftime( '%Y-%m-%d' ),        #Datum
                self.time.strftime( '%H' ),              #Zeit
                str( self )                              #Aufgabennname
        )
        return luigi.LocalTarget( output_path )

Target legt lediglich eine leere Datei lokal als Beweis für die Jobausführung ab. Verlassen Sie sich nicht auf "BigqueryLoadTask" Es ist fertig.

Recommended Posts

Vorsichtsmaßnahmen bei der stündlichen Ausgabe an die BigQuery-Tabelle mit Luigi
Vorsichtsmaßnahmen bei der Eingabe von CSV mit Python und der Ausgabe an json, um exe zu erstellen
Ein Hinweis, dem ich beim Erstellen einer Tabelle mit SQL Alchemy verfallen war
Vorsichtsmaßnahmen bei der Installation von Tensorflow mit Anaconda
Vorsichtsmaßnahmen bei Verwendung von sechs mit Python 2.5
[memo] Senden Sie Tabelleninformationen mit BigQuery Schedule und Cloud-Funktionen an GCS
Zu beachtende Punkte bei der Lösung von DP-Problemen mit Python
So arbeiten Sie mit BigQuery in Python
Es ist bequemer, csv-table zu verwenden, wenn Sie eine Tabelle mit Python-Sphinx schreiben