Erstellen Sie im Voraus ein Dienstkonto, um die BigQuery-API zu verwenden, und laden Sie den Berechtigungsnachweis (JSON) herunter. https://cloud.google.com/docs/authentication/getting-started?hl=ja
Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
migrate_table.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
Geben Sie für "[PATH TO CREDENTIAL]" den JSON-Pfad des Berechtigungsnachweises an.
migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
#Vorhandene Tabelle löschen
client.delete_table(table, not_found_ok=True)
#Eine Tabelle erstellen
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
Geben Sie für "[DATASET NAME]" und "[TABLE NAME]" den Datensatznamen und den Tabellennamen des Erstellungsziels an.
Wenn das Flag "not_found_ok" beim Löschen einer vorhandenen Tabelle "False" lautet, tritt ein Fehler auf, wenn die Tabelle nicht vorhanden ist. Belassen Sie sie daher bei "True". Ein Bild wie "DROP TABLE IF EXISTS" in DDL.
Beim Erstellen einer Tabelle müssen Sie den Spaltennamen und den Spaltentyp als Schemadefinition angeben. Standardmäßig ist es eine nullfähige Spalte. Wenn Sie sie also zu einer Nicht-Null-Spalte machen möchten, geben Sie den Modus an
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
Wird besorgt.
Importieren Sie CSV-Daten als Anfangsdaten. Bereiten Sie die Daten, die dem Typ der Schemadefinition in CSV entsprechen, im Voraus vor.
import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
#Anfangsdaten importieren
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
Wenn Sie den CSV-Header überspringen möchten, können Sie die Anzahl der übersprungenen Zeilen mit skip_leading_rows
angeben.
migrate_view.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
Im Fall einer Ansicht entspricht der Ablauf fast dem einer realen Tabelle.
Der Unterschied besteht darin, dass im Fall einer Ansicht die SQL-Abfrage direkt in view_query
angegeben wird.
Sie müssen die Schemadefinition nicht angeben, da sie automatisch aus SQL-Abfragen erstellt wird.
Wenn Sie die Ausführungsabfrage- und Planungseinstellungen in einer Szene ändern möchten, in der Daten regelmäßig mithilfe der geplanten Abfrage aktualisiert werden.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
Ich werde die Zeitplaneinstellungen (config
) und die Aktualisierungsmaske ( update_mask
) an update_transfer_config
übergeben.
update_mask
gibt an, welches Feld in config
aktualisiert werden soll. Einstellungen, die nicht in "update_mask" enthalten sind, werden nicht aktualisiert.
Die Details des Einstellwerts von config
sind wie folgt.
name Geben Sie den Ressourcennamen des Zeitplans an. Der Ressourcenname kann anhand der Konfigurationsinformationen des entsprechenden Zeitplans bestätigt werden, da die Liste der Zeitpläne in "Geplante Abfrage" der BigQuery-Konsole angezeigt wird.
destination_dataset_id Geben Sie den Namen des Datasets an, in dem die im Zeitplan ausgeführten Abfrageergebnisse gespeichert werden.
display_name Da es sich um den Anzeigenamen des Zeitplans handelt, geben Sie ihm einen beliebigen Namen.
params.query Gibt die auszuführende Abfrage an.
params.destination_table_name_template Geben Sie den Namen der Tabelle an, in der die im Zeitplan ausgeführten Abfrageergebnisse gespeichert werden.
params.write_disposition Gibt an, wie in der Tabelle gespeichert werden soll. Wenn Sie "WRITE_TRUNCATE" angeben, wird die vorhandene Tabelle durch das Ergebnis der Ausführungsabfrage ersetzt. Wenn "WRITE_APPEND" angegeben ist, wird das Ergebnis der Ausführungsabfrage zur vorhandenen Tabelle hinzugefügt.
schedule Geben Sie den Ausführungszeitpunkt des Zeitplans an. ex. Führen Sie jede Stunde ・ ・ ・ alle 1 Stunde aus Laufen Sie jeden Tag um Mitternacht ... jeden Tag 00:00
Wenn Sie BigQuery als Data Mart verwenden, ist es schwierig, Änderungen zu verwalten, wenn Sie eine Tabelle über die Konsole erstellen. Daher wird empfohlen, diese mit git zu verwalten, wenn Sie sie in den Code einfügen.
Recommended Posts