Luigi Reverse Reference Bitte beziehen Sie sich auf diesen Beitrag Ich habe einen Job namens Local-> GCS-> BigQuery für die stündliche Verarbeitung von Ergebnissen erstellt.
import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat
class LoadToGcs( luigi_bigquery.Query ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
def requires( self ):
#Vorherige Arbeit. Lokale Ausgabe von Dateien im TSV-Format
return Calculate( time = self.time)
def output( self ):
path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
return GCSTarget( path, client = client )
def run( self ):
with self.input().open('r') as input:
results = pd.read_csv( input, sep='\t' )
with self.output().open('w') as output:
results.to_csv( output, index=False, encoding='utf8' )
class LoadToTable( BigqueryLoadTaskEx ):
time = luigi.DateHourParameter()
source_format = SourceFormat.CSV
#Ich möchte, dass Sie jede Stunde an den Tisch hängen
write_disposition = WriteDisposition.WRITE_APPEND
create_disposition = CreateDisposition.CREATE_IF_NEEDED
max_bad_records = 0
skip_leading_rows = 1
def requires( self ):
#Lesen Sie Dateien aus GCS
return LoadToGcs( time = self.time, )
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def source_uris(self):
return [ self.input().path ]
def output(self):
return BigqueryTarget(
project_id = 'test_project',
dataset_id = 'test_dataset',
#Bild des Tabellennamens: Test_table_20161013
table_id = self.time.strftime( 'test_table' + '_%Y%m%d' ),
client = self.get_client()
)
Luigis "BigqueryLoadTask" machte den Code wirklich einfach und beeindruckend.
Warum
** Aufgrund der Spezifikationen von BigQuery wird empfohlen, am Ende "_% Y% m% d" hinzuzufügen, wenn Sie das Datum in den Tabellennamen eingeben. ** Der Grund dafür ist, dass es in einem Dropdown-Menü zusammengefasst ist, sodass es in Bezug auf die Benutzeroberfläche sehr leicht zu erkennen ist, wenn die Anzahl der Tabellen groß ist. Referenz: http://tech.vasily.jp/entry/bigquery_data_platform
Für eine Tabelle pro Tag wird eine zusätzliche Verarbeitung 24 Mal durchgeführt. Da die Ausgabe jedoch "BigqueryTarget" ist und als nach 1 Uhr angehängt wird (sofern die Tabelle vorhanden ist), gilt sie als ausgeführt und der Job wird ohne Laden beendet.
write_disposition = WriteDisposition.WRITE_APPEND
Ich dachte, es wäre ein Anhang, wenn ich das schreibe, aber aufgrund von Luigis Spezifikationen ist es absolut notwendig, zuerst zu Target zu gehen. (Natürlich)
table_id = self.time.strftime( 'test_table' + '_%Y%m%d%H' )
Die schnellste Lösung besteht darin, sie bis zu diesem Zeitpunkt in den Tabellennamen einzufügen, aber die Anzahl der Tabellen steigt ständig um 24 pro Tag. Ich möchte es nicht tun, weil es nicht herunterfällt und die BigQuery ziemlich schmutzig wird.
#Gewöhnliche Luigi.Aufgabe verwenden
class LoadToTable( luigi.Task ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def requires( self ):
return LoadToGcs( time = self.time, )
def run( self ):
#Verwenden Sie BigqueryClient
bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
with open( self.schema_json, 'r' ) as f:
schema = json.load( f )
project_id = 'test_project'
dataset_id = 'test_dataset'
table_id = 'test_table'
job = {
'configuration': {
'load': {
'sourceUris': [
self.input().path
],
'schema': {
'fields': schema
},
'destinationTable': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
},
'sourceFormat': SourceFormat.CSV,
'writeDisposition': WriteDisposition.WRITE_APPEND,
#Da die erste Zeile der Originaldaten der Spaltenname ist, fügen Sie ihn hinzu
'skipLeadingRows': 1,
'allowQuotedNewlines': 'true'
}
}
}
#In BigQuery laden
bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))
#Leere Datei erstellen
if not self.dry:
self.output().open('w').close()
def output( self ):
output_path = os.path.join(
'/tmp/work', #Arbeitsverzeichnis
self.time.strftime( '%Y-%m-%d' ), #Datum
self.time.strftime( '%H' ), #Zeit
str( self ) #Aufgabennname
)
return luigi.LocalTarget( output_path )
Target legt lediglich eine leere Datei lokal als Beweis für die Jobausführung ab. Verlassen Sie sich nicht auf "BigqueryLoadTask" Es ist fertig.
Recommended Posts