[PYTHON] 4 ways to update diffs from spreadsheets to BigQuery

What I wanted to do was upload the spreadsheet information to BigQuery and update the delta data.

I was in trouble

Originally, the process was performed according to the following flow.

図7.png

However, in this case, even if you insert the data immediately after creating a new table, the data is often not updated. When executing SQL, although it can be seen that the execution is completed with the status of job id, There were times when the time difference was quite large, and when I put sleep, I could insert it, I was looking for a stable method because I couldn't do it.

Conclusion

It was solved by changing the above flow to the following flow.

図11.png

For processing that creates a table and uploads data separately, but at the same time When I changed it, it worked fine. It's natural to ask, but I didn't know how to write it, so how after creating the table I was just looking at the methods of "how to grasp the completion of execution?" And "how to perform standby processing?".

What I'm writing in this article

This time, only the difference data of the information updated in the spreadsheet I am writing the content to reflect.

Spreadsheet data is available below.

図8.png

Japanese name on the 1st line, column for BigQuery on the 2nd line, data type on the 3rd line, It is a data structure that the actual data is contained in the 4th and subsequent lines.

I really wanted to include PK settings, but This time it is not included as a purpose, so if the ID in column A matches, it will be updated If they do not match, they will be updated.

The BigQuery table will be updated as follows:

図6.png

4 ways to do it

I practiced the following four things.

① Create table by executing SQL ⇒ Update ② Create a table with google-cloud-bigquery ⇒ Update from pandas ③ Create / update table with pandas-gbq ④ Create / update table with google-cloud-bigquery

① and ② are patterns that cannot be uploaded after creating a table in the flow that was originally planned. ③ and ④ are patterns that succeeded by uploading data at the same time as creating the table.

I think that some people have the same troubles, so I have listed ① and ② as well.

Below is the code introduction.

Common part code

Authentication


from google.colab import auth
auth.authenticate_user()

When you execute the above, the following authentication code will be displayed. 図4.png

Another article also shows how to do it without an authorization code, If you want to merge with data from tables of different PJs I think you should enter the verification code every time.

Reference: How to run with Google Colaboratory without authorization code

Spreadsheet operation and BigQuery operation are written in a separate article, so I will omit the details. How to operate spreadsheets in Python (how to use gspread) How to work with BigQuery in Python

Preparing to use the spreadsheet


!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread

#Authentication process
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())

ss_id = `Spreadsheet ID`
sht_name = 'Sheet name'

workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()

values
# [['ID', 'name', 'age', 'Favorite things'],
#  ['id', 'name', 'age', 'favorit'],
#  ['int64', 'string', 'int64', 'string'],
#  ['1', 'Bob', '25', 'apple'],
#  ['2', 'Tom', '32', 'orange'],
#  ['3', 'Jay', '28', 'grape']]

Ready to use BigQuery


project_id = 'GCP project ID'
dateset = 'Data set name'
table_name = 'table name'

from google.cloud import bigquery
client = bigquery.Client(project=project_id)

Up to this point, it will be the part shared by ① to ④.

(1) Create / update table by executing SQL

I don't know if the expression SQL execution is correct, It is a method that can be executed with the same SQL as that operated with the query editor.

I think the merit is that you can see the status by easy to write and job id.

SQL creation for temp table creation


table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'

for column, dtype in zip(values[1],values[2]):
  query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query

# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)

SQL execution


#Run
result = client.query(query)

#Check job status
client.get_job(result.job_id, location='asia-northeast1').state

# 'DONE'

How to make the table so far is different between ① and ③, The SQL for updating the following diffs is commonly used.

SQL creation for update


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query

# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)

SQL execution for update


client.query(query)

After executing everything, when you look at the query result on BigQuery It has been successful as shown in the image below.

図9.png

However, looking at the table, it looks like the following. The status of the job is DONE, so please reflect it ...

図2.png

The data is reflected after about 5 minutes due to a server problem. It was useless because I wanted to run it continuously immediately.

② Create / update table with google-cloud-bigquery

This is a case where the spreadsheet information is uploaded with the contents listed here. https://googleapis.dev/python/bigquery/latest/usage/tables.html

Delete table and create schema


table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)

#Delete if there is a table
client.delete_table(f'{table_id}_temp', not_found_ok=True)

schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema

# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('name', 'STRING', 'NULLABLE', None, ()),
#  SchemaField('age', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]

Creating a temp table


table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)

The following is the same code as ①, so it is summarized.

Create and execute SQL for update


df = pd.DataFrame(values[3:], columns=values[1])
#Blank deletion, blank processing
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

#Update to temp table
client.insert_rows(table, df.values.tolist())

#Creating SQL for update
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

#Run SQL to update data
client.query(query)

Even with this, the probability that a table of person_temp was created was low as in ①. I used to use this code a lot when linking with DP, At that time, I put sleep for about 20 seconds to make it work.

③ Create / update table with pandas-gbq

I didn't expect much because there was another blog that said it was evil. I'm glad I tried it.

Creating a DataFrame


#Requires installation to use
!pip install pandas-gbq

df = pd.DataFrame(values[3:], columns=values[1])
#Blank deletion, blank processing
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})

Creating this schema creates dictionary types for name and type.

Creating schema


schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append({'name': column, 'type': dtype})
schema

# [{'name': 'id', 'type': 'int64'},
#  {'name': 'name', 'type': 'string'},
#  {'name': 'age', 'type': 'int64'},
#  {'name': 'favorit', 'type': 'string'}]

Update to temporary table


df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)

Create and execute SQL for update


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

I was able to update here without any problems. However, if you look at the site below, it says that you should move from pandas-gbq to google-cloud-bigquery. https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja

So, next, I described the method using google-cloud-bigquery.

④ Create / update table with google-cloud-bigquery

Install pyarrow


#Install pyarrow
!pip install pyarrow

table_id = f'{project_id}.{dateset}.{table_name}'

df = pd.DataFrame(values[3:], columns=values[1])
#Blank deletion, blank processing
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

Creating schema and changing data type of DataFrame


schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column,dtype))
  if dtype != 'string':
    df[column] = df[column].astype(dtype)
schema

google-cloud-bigquery is different from pandas-gbq If the data types of the table and DataFrame do not match, an error will occur. The official setting is finer ...

You can check if the data type has changed with df.dtpyes.

dtypes


df.dtypes

# id          int64
# name       object
# age         int64
# favorit    object
# dtype: object

Next, set the settings to be performed with load_table_from_dataframe () to be executed later. config means config in Japanese.

Below is a list of attributes that can be set. https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html

This time, I will set it to overwrite when there is an existing table with schema and write_disposition.

jon settings


job_config = bigquery.LoadJobConfig(
  schema = schema,
  write_disposition = 'WRITE_TRUNCATE'
)

Upload df data


job = client.load_table_from_dataframe(
  df, f'{table_id}_temp', job_config=job_config
)

# Wait for the load job to complete.
job.result()

The following is the same code as ①, so it is summarized.

Create and execute SQL for update


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

I feel that ③ is better because it is easier to specify the data type. Is ④ moving faster?

Another thing I'm worried about is when the amount of data increases. Since the insert limit in google-cloud-bigquery was 10,000 records If you want to do more than that, you may need to do each processing.

In that case, replace the table only the first time. From the second time onward, there seems to be no problem if the process is irt.

Recommended Posts

4 ways to update diffs from spreadsheets to BigQuery
Update python-social-auth from 0.1.x to 0.2.x
Update Python on Mac from 2 to 3
How to update Google Sheets from Python
Changes from Python 3.0 to Python 3.5
How to update easy_install
Transition from WSL1 to WSL2
How to update Spyder
Use BigQuery from python.
Run BigQuery from Lambda
update django version 1.11.1 to 2.2
From editing to execution