[PYTHON] 4 Möglichkeiten zum Aktualisieren von Unterschieden zwischen Tabellenkalkulationen und BigQuery

Ich wollte die Tabellenkalkulationsinformationen in BigQuery hochladen und die Differenzdaten aktualisieren.

ich war in Schwierigkeiten

Ursprünglich wurde der Prozess gemäß dem folgenden Ablauf durchgeführt.

図7.png

In diesem Fall wurden die Daten jedoch häufig nicht aktualisiert, selbst wenn die Daten unmittelbar nach dem Erstellen einer neuen Tabelle eingefügt wurden. Obwohl bei der Ausführung von SQL zu sehen ist, dass die Ausführung mit dem Status "Job-ID" abgeschlossen ist, Manchmal war der Zeitunterschied ziemlich groß und ich konnte ihn mit Schlaf einfügen, Ich suchte nach einer stabilen Methode, weil ich es nicht konnte.

Fazit

Es wurde gelöst, indem der obige Fluss in den folgenden Fluss geändert wurde.

図11.png

Für die Verarbeitung, bei der eine Tabelle erstellt und Daten separat, aber gleichzeitig hochgeladen werden Als ich es geändert habe, hat es gut funktioniert. Es ist natürlich zu fragen, aber ich wusste nicht, wie ich es schreiben sollte, also wie nach dem Erstellen der Tabelle Ich habe mir nur die Methoden angesehen, wie man den Abschluss der Ausführung erfasst und wie man eine Standby-Verarbeitung durchführt.

Was steht in diesem Artikel geschrieben

Diesmal werden nur die Differenzdaten der Informationen in der Tabelle aktualisiert Ich schreibe den Inhalt, um ihn zu reflektieren.

Tabellenkalkulationsdaten sind unten verfügbar.

図8.png

Japanischer Name in der ersten Zeile, Spalte für BigQuery in der zweiten Zeile, Datentyp in der dritten Zeile, Es ist eine Datenstruktur, dass die tatsächlichen Daten in der 4. und den folgenden Zeilen enthalten sind.

Ich wollte eigentlich PK-Einstellungen einbinden, aber Dieses Mal ist es nicht als Zweck enthalten. Wenn also die ID in Spalte A übereinstimmt, wird sie aktualisiert Wenn sie nicht übereinstimmen, werden sie aktualisiert.

Die BigQuery-Tabelle wird wie folgt aktualisiert.

図6.png

4 Möglichkeiten, es zu tun

Ich habe die folgenden vier Dinge geübt.

① Erstellen Sie eine Tabelle, indem Sie SQL ausführen. ⇒ Aktualisieren ② Erstellen Sie eine Tabelle mit Google-Cloud-Bigquery. ⇒ Update von Pandas ③ Tabelle mit pandas-gbq erstellen / aktualisieren ④ Tabelle mit Google-Cloud-Big-Abfrage erstellen / aktualisieren

① und ② sind Muster, die nach dem Erstellen einer Tabelle im ursprünglich geplanten Ablauf nicht hochgeladen werden können. ③ und ④ sind Muster, bei denen gleichzeitig mit dem Erstellen der Tabelle Daten hochgeladen wurden.

Ich denke, dass einige Leute die gleichen Probleme haben, deshalb habe ich auch ① und ② aufgelistet.

Unten finden Sie die Code-Einführung.

Gemeinsamer Teilecode

Authentifizierung


from google.colab import auth
auth.authenticate_user()

Wenn Sie den obigen Vorgang ausführen, wird der folgende Authentifizierungscode angezeigt. 図4.png

Ein anderer Artikel zeigt auch, wie es ohne Autorisierungscode geht. Wenn Sie mit Daten aus Tabellen verschiedener PJs zusammenführen möchten Ich denke, Sie sollten den Bestätigungscode jedes Mal eingeben.

Referenz: Ausführen mit Google Colaboratory ohne Autorisierungscode

Die Tabellenkalkulationsoperation und die BigQuery-Operation werden in einem separaten Artikel beschrieben, sodass Details weggelassen werden. So betreiben Sie Tabellenkalkulationen in Python (Verwendung von gspread) So betreiben Sie BigQuery mit Python

Vorbereiten der Verwendung der Tabelle


!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread

#Authentifizierungsprozess
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())

ss_id = `Tabellenkalkulations-ID`
sht_name = 'Blattname'

workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()

values
# [['ID', 'Name', 'Alter', 'Lieblingssachen'],
#  ['id', 'name', 'age', 'favorit'],
#  ['int64', 'string', 'int64', 'string'],
#  ['1', 'Bob', '25', 'apple'],
#  ['2', 'Tom', '32', 'orange'],
#  ['3', 'Jay', '28', 'grape']]

Bereit zur Verwendung von BigQuery


project_id = 'GCP-Projekt-ID'
dateset = 'Datensatzname'
table_name = 'Tabellenname'

from google.cloud import bigquery
client = bigquery.Client(project=project_id)

Bis zu diesem Punkt wird es der Teil sein, den ① bis ④ gemeinsam haben.

(1) Erstellen / Aktualisieren einer Tabelle durch Ausführen von SQL

Ich weiß nicht, ob der Ausdruck SQL-Ausführung korrekt ist. Diese Methode kann mit demselben SQL ausgeführt werden wie mit dem Abfrageeditor.

Ich denke, der Vorteil ist, dass Sie den Status anhand von "einfach zu schreiben" und "Job-ID" sehen können.

SQL-Erstellung für die Erstellung temporärer Tabellen


table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'

for column, dtype in zip(values[1],values[2]):
  query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query

# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)

SQL-Ausführung


#Lauf
result = client.query(query)

#Überprüfen Sie den Auftragsstatus
client.get_job(result.job_id, location='asia-northeast1').state

# 'DONE'

Wie man die Tabelle bisher erstellt, unterscheidet sich zwischen ① und ③, Das SQL zum Aktualisieren der folgenden Unterschiede wird häufig verwendet.

SQL-Erstellung für Update


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query

# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)

SQL-Ausführung zur Aktualisierung


client.query(query)

Nachdem Sie alles ausgeführt haben, sehen Sie sich das Abfrageergebnis in BigQuery an Es war erfolgreich, wie im Bild unten gezeigt.

図9.png

Wenn Sie sich jedoch die Tabelle ansehen, befindet sie sich wie unten gezeigt in einem leeren Zustand. Der Status des Jobs ist "FERTIG", also reflektieren Sie ihn bitte ...

図2.png

Die Daten werden aufgrund eines Serverproblems nach ca. 5 Minuten wiedergegeben. Es war nutzlos, weil ich es sofort ununterbrochen laufen lassen wollte.

② Tabelle mit Google-Cloud-Big-Abfrage erstellen / aktualisieren

In diesem Fall werden die Tabellenkalkulationsinformationen mit den hier aufgeführten Inhalten hochgeladen. https://googleapis.dev/python/bigquery/latest/usage/tables.html

Tabelle löschen und Schema erstellen


table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)

#Löschen, wenn eine Tabelle vorhanden ist
client.delete_table(f'{table_id}_temp', not_found_ok=True)

schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema

# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('name', 'STRING', 'NULLABLE', None, ()),
#  SchemaField('age', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]

Erstellen einer temporären Tabelle


table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)

Das Folgende ist der gleiche Code wie ①, daher wird es zusammengefasst.

Erstellen Sie SQL und führen Sie es zur Aktualisierung aus


df = pd.DataFrame(values[3:], columns=values[1])
#Leere Löschung, leere Verarbeitung
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

#Update auf temporäre Tabelle
client.insert_rows(table, df.values.tolist())

#SQL für Update erstellen
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

#Führen Sie SQL aus, um Daten zu aktualisieren
client.query(query)

Trotzdem war die Wahrscheinlichkeit, dass eine Tabelle mit "person_temp" erstellt wurde, gering wie in ①. Ich habe diesen Code oft verwendet, wenn ich mit DP verknüpft habe. Zu diesem Zeitpunkt habe ich ungefähr 20 Sekunden lang geschlafen, damit es funktioniert.

③ Tabelle mit pandas-gbq erstellen / aktualisieren

Ich habe nicht viel erwartet, weil einige andere Blogs sagten, es sei böse. Ich bin froh, dass ich es versucht habe.

Erstellen eines DataFrame


#Erfordert die Installation zur Verwendung
!pip install pandas-gbq

df = pd.DataFrame(values[3:], columns=values[1])
#Leere Löschung, leere Verarbeitung
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})

Durch das Erstellen dieses Schemas wird ein Wörterbuchtyp mit den Namen "Name" und "Typ" erstellt.

Erstellen eines Schemas


schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append({'name': column, 'type': dtype})
schema

# [{'name': 'id', 'type': 'int64'},
#  {'name': 'name', 'type': 'string'},
#  {'name': 'age', 'type': 'int64'},
#  {'name': 'favorit', 'type': 'string'}]

Update auf temporäre Tabelle


df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)

Erstellen Sie SQL und führen Sie es zur Aktualisierung aus


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

Ich konnte hier problemlos aktualisieren. Wenn Sie sich jedoch die unten stehende Website ansehen, heißt es, dass Sie von "pandas-gbq" zu "google-cloud-bigquery" wechseln sollten. https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja

Als nächstes habe ich die Methode mit "google-cloud-bigquery" beschrieben.

④ Tabelle mit Google-Cloud-Big-Abfrage erstellen / aktualisieren

Installieren Sie Pyarrow


#Installieren Sie Pyarrow
!pip install pyarrow

table_id = f'{project_id}.{dateset}.{table_name}'

df = pd.DataFrame(values[3:], columns=values[1])
#Leere Löschung, leere Verarbeitung
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

Erstellen eines Schemas und Ändern des Datentyps eines DataFrame


schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column,dtype))
  if dtype != 'string':
    df[column] = df[column].astype(dtype)
schema

google-cloud-bigquery unterscheidet sich von pandas-gbq Ein Fehler tritt auf, wenn die Datentypen der Tabelle und des DataFrame nicht übereinstimmen. Die offizielle Einstellung ist feiner ...

Mit df.dtpyes können Sie überprüfen, ob sich der Datentyp geändert hat.

dtypes


df.dtypes

# id          int64
# name       object
# age         int64
# favorit    object
# dtype: object

Nehmen Sie als Nächstes die Einstellungen vor, die mit "load_table_from_dataframe ()" ausgeführt werden sollen, um sie später auszuführen. config bedeutet auf Japanisch config.

Unten finden Sie eine Liste der Attribute, die festgelegt werden können. https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html

Dieses Mal werden "schema" und "write_disposition" so eingestellt, dass die vorhandene Tabelle überschrieben wird, wenn sie vorhanden ist.

Jon Einstellungen


job_config = bigquery.LoadJobConfig(
  schema = schema,
  write_disposition = 'WRITE_TRUNCATE'
)

Laden Sie df-Daten hoch


job = client.load_table_from_dataframe(
  df, f'{table_id}_temp', job_config=job_config
)

# Wait for the load job to complete.
job.result()

Das Folgende ist der gleiche Code wie ①, daher wird es zusammengefasst.

Erstellen Sie SQL und führen Sie es zur Aktualisierung aus


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

Ich denke, dass ③ besser ist, weil es einfacher ist, den Datentyp anzugeben. Bewegt sich ④ schneller?

Eine andere Sache, über die ich mir Sorgen mache, ist, wenn die Datenmenge zunimmt. Da das Einfügungslimit in Google-Cloud-Big Query 10.000 Datensätze betrug Wenn Sie mehr als das tun möchten, müssen Sie möglicherweise jede Verarbeitung durchführen.

Ersetzen Sie in diesem Fall die Tabelle nur beim ersten Mal. Ab dem zweiten Mal scheint es kein Problem zu geben, wenn der Prozess aktiviert wird.

Recommended Posts

4 Möglichkeiten zum Aktualisieren von Unterschieden zwischen Tabellenkalkulationen und BigQuery
Aktualisieren Sie python-social-auth von 0.1.x auf 0.2.x.
Aktualisieren Sie Mac Python von 2 auf 3
So aktualisieren Sie Google Sheets von Python
Änderungen von Python 3.0 zu Python 3.5
So aktualisieren Sie easy_install
Übergang von WSL1 zu WSL2
So aktualisieren Sie Spyder
Verwenden Sie BigQuery aus Python.
Führen Sie BigQuery von Lambda aus
Aktualisieren Sie die Django-Version 1.11.1 auf 2.2
Von der Bearbeitung bis zur Ausführung