Ich wollte die Tabellenkalkulationsinformationen in BigQuery hochladen und die Differenzdaten aktualisieren.
Ursprünglich wurde der Prozess gemäß dem folgenden Ablauf durchgeführt.
In diesem Fall wurden die Daten jedoch häufig nicht aktualisiert, selbst wenn die Daten unmittelbar nach dem Erstellen einer neuen Tabelle eingefügt wurden. Obwohl bei der Ausführung von SQL zu sehen ist, dass die Ausführung mit dem Status "Job-ID" abgeschlossen ist, Manchmal war der Zeitunterschied ziemlich groß und ich konnte ihn mit Schlaf einfügen, Ich suchte nach einer stabilen Methode, weil ich es nicht konnte.
Es wurde gelöst, indem der obige Fluss in den folgenden Fluss geändert wurde.
Für die Verarbeitung, bei der eine Tabelle erstellt und Daten separat, aber gleichzeitig hochgeladen werden Als ich es geändert habe, hat es gut funktioniert. Es ist natürlich zu fragen, aber ich wusste nicht, wie ich es schreiben sollte, also wie nach dem Erstellen der Tabelle Ich habe mir nur die Methoden angesehen, wie man den Abschluss der Ausführung erfasst und wie man eine Standby-Verarbeitung durchführt.
Diesmal werden nur die Differenzdaten der Informationen in der Tabelle aktualisiert Ich schreibe den Inhalt, um ihn zu reflektieren.
Tabellenkalkulationsdaten sind unten verfügbar.
Japanischer Name in der ersten Zeile, Spalte für BigQuery in der zweiten Zeile, Datentyp in der dritten Zeile, Es ist eine Datenstruktur, dass die tatsächlichen Daten in der 4. und den folgenden Zeilen enthalten sind.
Ich wollte eigentlich PK-Einstellungen einbinden, aber Dieses Mal ist es nicht als Zweck enthalten. Wenn also die ID in Spalte A übereinstimmt, wird sie aktualisiert Wenn sie nicht übereinstimmen, werden sie aktualisiert.
Die BigQuery-Tabelle wird wie folgt aktualisiert.
Ich habe die folgenden vier Dinge geübt.
① Erstellen Sie eine Tabelle, indem Sie SQL ausführen. ⇒ Aktualisieren ② Erstellen Sie eine Tabelle mit Google-Cloud-Bigquery. ⇒ Update von Pandas ③ Tabelle mit pandas-gbq erstellen / aktualisieren ④ Tabelle mit Google-Cloud-Big-Abfrage erstellen / aktualisieren
① und ② sind Muster, die nach dem Erstellen einer Tabelle im ursprünglich geplanten Ablauf nicht hochgeladen werden können. ③ und ④ sind Muster, bei denen gleichzeitig mit dem Erstellen der Tabelle Daten hochgeladen wurden.
Ich denke, dass einige Leute die gleichen Probleme haben, deshalb habe ich auch ① und ② aufgelistet.
Unten finden Sie die Code-Einführung.
Authentifizierung
from google.colab import auth
auth.authenticate_user()
Wenn Sie den obigen Vorgang ausführen, wird der folgende Authentifizierungscode angezeigt.
Ein anderer Artikel zeigt auch, wie es ohne Autorisierungscode geht. Wenn Sie mit Daten aus Tabellen verschiedener PJs zusammenführen möchten Ich denke, Sie sollten den Bestätigungscode jedes Mal eingeben.
Referenz: Ausführen mit Google Colaboratory ohne Autorisierungscode
Die Tabellenkalkulationsoperation und die BigQuery-Operation werden in einem separaten Artikel beschrieben, sodass Details weggelassen werden. So betreiben Sie Tabellenkalkulationen in Python (Verwendung von gspread) So betreiben Sie BigQuery mit Python
Vorbereiten der Verwendung der Tabelle
!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread
#Authentifizierungsprozess
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())
ss_id = `Tabellenkalkulations-ID`
sht_name = 'Blattname'
workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()
values
# [['ID', 'Name', 'Alter', 'Lieblingssachen'],
# ['id', 'name', 'age', 'favorit'],
# ['int64', 'string', 'int64', 'string'],
# ['1', 'Bob', '25', 'apple'],
# ['2', 'Tom', '32', 'orange'],
# ['3', 'Jay', '28', 'grape']]
Bereit zur Verwendung von BigQuery
project_id = 'GCP-Projekt-ID'
dateset = 'Datensatzname'
table_name = 'Tabellenname'
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
Bis zu diesem Punkt wird es der Teil sein, den ① bis ④ gemeinsam haben.
Ich weiß nicht, ob der Ausdruck SQL-Ausführung korrekt ist. Diese Methode kann mit demselben SQL ausgeführt werden wie mit dem Abfrageeditor.
Ich denke, der Vorteil ist, dass Sie den Status anhand von "einfach zu schreiben" und "Job-ID" sehen können.
SQL-Erstellung für die Erstellung temporärer Tabellen
table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'
for column, dtype in zip(values[1],values[2]):
query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query
# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)
SQL-Ausführung
#Lauf
result = client.query(query)
#Überprüfen Sie den Auftragsstatus
client.get_job(result.job_id, location='asia-northeast1').state
# 'DONE'
Wie man die Tabelle bisher erstellt, unterscheidet sich zwischen ① und ③, Das SQL zum Aktualisieren der folgenden Unterschiede wird häufig verwendet.
SQL-Erstellung für Update
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query
# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)
SQL-Ausführung zur Aktualisierung
client.query(query)
Nachdem Sie alles ausgeführt haben, sehen Sie sich das Abfrageergebnis in BigQuery an Es war erfolgreich, wie im Bild unten gezeigt.
Wenn Sie sich jedoch die Tabelle ansehen, befindet sie sich wie unten gezeigt in einem leeren Zustand. Der Status des Jobs ist "FERTIG", also reflektieren Sie ihn bitte ...
Die Daten werden aufgrund eines Serverproblems nach ca. 5 Minuten wiedergegeben. Es war nutzlos, weil ich es sofort ununterbrochen laufen lassen wollte.
In diesem Fall werden die Tabellenkalkulationsinformationen mit den hier aufgeführten Inhalten hochgeladen. https://googleapis.dev/python/bigquery/latest/usage/tables.html
Tabelle löschen und Schema erstellen
table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)
#Löschen, wenn eine Tabelle vorhanden ist
client.delete_table(f'{table_id}_temp', not_found_ok=True)
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema
# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
# SchemaField('name', 'STRING', 'NULLABLE', None, ()),
# SchemaField('age', 'INT64', 'NULLABLE', None, ()),
# SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]
Erstellen einer temporären Tabelle
table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)
Das Folgende ist der gleiche Code wie ①, daher wird es zusammengefasst.
Erstellen Sie SQL und führen Sie es zur Aktualisierung aus
df = pd.DataFrame(values[3:], columns=values[1])
#Leere Löschung, leere Verarbeitung
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
#Update auf temporäre Tabelle
client.insert_rows(table, df.values.tolist())
#SQL für Update erstellen
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
#Führen Sie SQL aus, um Daten zu aktualisieren
client.query(query)
Trotzdem war die Wahrscheinlichkeit, dass eine Tabelle mit "person_temp" erstellt wurde, gering wie in ①. Ich habe diesen Code oft verwendet, wenn ich mit DP verknüpft habe. Zu diesem Zeitpunkt habe ich ungefähr 20 Sekunden lang geschlafen, damit es funktioniert.
Ich habe nicht viel erwartet, weil einige andere Blogs sagten, es sei böse. Ich bin froh, dass ich es versucht habe.
Erstellen eines DataFrame
#Erfordert die Installation zur Verwendung
!pip install pandas-gbq
df = pd.DataFrame(values[3:], columns=values[1])
#Leere Löschung, leere Verarbeitung
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})
Durch das Erstellen dieses Schemas wird ein Wörterbuchtyp mit den Namen "Name" und "Typ" erstellt.
Erstellen eines Schemas
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append({'name': column, 'type': dtype})
schema
# [{'name': 'id', 'type': 'int64'},
# {'name': 'name', 'type': 'string'},
# {'name': 'age', 'type': 'int64'},
# {'name': 'favorit', 'type': 'string'}]
Update auf temporäre Tabelle
df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)
Erstellen Sie SQL und führen Sie es zur Aktualisierung aus
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
Ich konnte hier problemlos aktualisieren. Wenn Sie sich jedoch die unten stehende Website ansehen, heißt es, dass Sie von "pandas-gbq" zu "google-cloud-bigquery" wechseln sollten. https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja
Als nächstes habe ich die Methode mit "google-cloud-bigquery" beschrieben.
④ Tabelle mit Google-Cloud-Big-Abfrage erstellen / aktualisieren
Installieren Sie Pyarrow
#Installieren Sie Pyarrow
!pip install pyarrow
table_id = f'{project_id}.{dateset}.{table_name}'
df = pd.DataFrame(values[3:], columns=values[1])
#Leere Löschung, leere Verarbeitung
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
Erstellen eines Schemas und Ändern des Datentyps eines DataFrame
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column,dtype))
if dtype != 'string':
df[column] = df[column].astype(dtype)
schema
google-cloud-bigquery unterscheidet sich von pandas-gbq Ein Fehler tritt auf, wenn die Datentypen der Tabelle und des DataFrame nicht übereinstimmen. Die offizielle Einstellung ist feiner ...
Mit df.dtpyes können Sie überprüfen, ob sich der Datentyp geändert hat.
dtypes
df.dtypes
# id int64
# name object
# age int64
# favorit object
# dtype: object
Nehmen Sie als Nächstes die Einstellungen vor, die mit "load_table_from_dataframe ()" ausgeführt werden sollen, um sie später auszuführen.
config
bedeutet auf Japanisch config
.
Unten finden Sie eine Liste der Attribute, die festgelegt werden können. https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html
Dieses Mal werden "schema" und "write_disposition" so eingestellt, dass die vorhandene Tabelle überschrieben wird, wenn sie vorhanden ist.
Jon Einstellungen
job_config = bigquery.LoadJobConfig(
schema = schema,
write_disposition = 'WRITE_TRUNCATE'
)
Laden Sie df-Daten hoch
job = client.load_table_from_dataframe(
df, f'{table_id}_temp', job_config=job_config
)
# Wait for the load job to complete.
job.result()
Das Folgende ist der gleiche Code wie ①, daher wird es zusammengefasst.
Erstellen Sie SQL und führen Sie es zur Aktualisierung aus
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
Ich denke, dass ③ besser ist, weil es einfacher ist, den Datentyp anzugeben. Bewegt sich ④ schneller?
Eine andere Sache, über die ich mir Sorgen mache, ist, wenn die Datenmenge zunimmt. Da das Einfügungslimit in Google-Cloud-Big Query 10.000 Datensätze betrug Wenn Sie mehr als das tun möchten, müssen Sie möglicherweise jede Verarbeitung durchführen.
Ersetzen Sie in diesem Fall die Tabelle nur beim ersten Mal. Ab dem zweiten Mal scheint es kein Problem zu geben, wenn der Prozess aktiviert wird.
Recommended Posts