What I wanted to do was upload the spreadsheet information to BigQuery and update the delta data.
Originally, the process was performed according to the following flow.
However, in this case, even if you insert the data immediately after creating a new table, the data is often not updated.
When executing SQL, although it can be seen that the execution is completed with the status of job id
,
There were times when the time difference was quite large, and when I put sleep, I could insert it,
I was looking for a stable method because I couldn't do it.
It was solved by changing the above flow to the following flow.
For processing that creates a table and uploads data separately, but at the same time When I changed it, it worked fine. It's natural to ask, but I didn't know how to write it, so how after creating the table I was just looking at the methods of "how to grasp the completion of execution?" And "how to perform standby processing?".
This time, only the difference data of the information updated in the spreadsheet I am writing the content to reflect.
Spreadsheet data is available below.
Japanese name on the 1st line, column for BigQuery on the 2nd line, data type on the 3rd line, It is a data structure that the actual data is contained in the 4th and subsequent lines.
I really wanted to include PK settings, but This time it is not included as a purpose, so if the ID in column A matches, it will be updated If they do not match, they will be updated.
The BigQuery table will be updated as follows:
I practiced the following four things.
① Create table by executing SQL ⇒ Update ② Create a table with google-cloud-bigquery ⇒ Update from pandas ③ Create / update table with pandas-gbq ④ Create / update table with google-cloud-bigquery
① and ② are patterns that cannot be uploaded after creating a table in the flow that was originally planned. ③ and ④ are patterns that succeeded by uploading data at the same time as creating the table.
I think that some people have the same troubles, so I have listed ① and ② as well.
Below is the code introduction.
Authentication
from google.colab import auth
auth.authenticate_user()
When you execute the above, the following authentication code will be displayed.
Another article also shows how to do it without an authorization code, If you want to merge with data from tables of different PJs I think you should enter the verification code every time.
Reference: How to run with Google Colaboratory without authorization code
Spreadsheet operation and BigQuery operation are written in a separate article, so I will omit the details. How to operate spreadsheets in Python (how to use gspread) How to work with BigQuery in Python
Preparing to use the spreadsheet
!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread
#Authentication process
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())
ss_id = `Spreadsheet ID`
sht_name = 'Sheet name'
workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()
values
# [['ID', 'name', 'age', 'Favorite things'],
# ['id', 'name', 'age', 'favorit'],
# ['int64', 'string', 'int64', 'string'],
# ['1', 'Bob', '25', 'apple'],
# ['2', 'Tom', '32', 'orange'],
# ['3', 'Jay', '28', 'grape']]
Ready to use BigQuery
project_id = 'GCP project ID'
dateset = 'Data set name'
table_name = 'table name'
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
Up to this point, it will be the part shared by ① to ④.
I don't know if the expression SQL execution is correct, It is a method that can be executed with the same SQL as that operated with the query editor.
I think the merit is that you can see the status by easy to write
and job id
.
SQL creation for temp table creation
table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'
for column, dtype in zip(values[1],values[2]):
query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query
# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)
SQL execution
#Run
result = client.query(query)
#Check job status
client.get_job(result.job_id, location='asia-northeast1').state
# 'DONE'
How to make the table so far is different between ① and ③, The SQL for updating the following diffs is commonly used.
SQL creation for update
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query
# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)
SQL execution for update
client.query(query)
After executing everything, when you look at the query result on BigQuery It has been successful as shown in the image below.
However, looking at the table, it looks like the following.
The status of the job is DONE
, so please reflect it ...
The data is reflected after about 5 minutes due to a server problem. It was useless because I wanted to run it continuously immediately.
This is a case where the spreadsheet information is uploaded with the contents listed here. https://googleapis.dev/python/bigquery/latest/usage/tables.html
Delete table and create schema
table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)
#Delete if there is a table
client.delete_table(f'{table_id}_temp', not_found_ok=True)
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema
# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
# SchemaField('name', 'STRING', 'NULLABLE', None, ()),
# SchemaField('age', 'INT64', 'NULLABLE', None, ()),
# SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]
Creating a temp table
table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)
The following is the same code as ①, so it is summarized.
Create and execute SQL for update
df = pd.DataFrame(values[3:], columns=values[1])
#Blank deletion, blank processing
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
#Update to temp table
client.insert_rows(table, df.values.tolist())
#Creating SQL for update
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
#Run SQL to update data
client.query(query)
Even with this, the probability that a table of person_temp
was created was low as in ①.
I used to use this code a lot when linking with DP,
At that time, I put sleep for about 20 seconds to make it work.
I didn't expect much because there was another blog that said it was evil. I'm glad I tried it.
Creating a DataFrame
#Requires installation to use
!pip install pandas-gbq
df = pd.DataFrame(values[3:], columns=values[1])
#Blank deletion, blank processing
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})
Creating this schema creates dictionary types for name
and type
.
Creating schema
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append({'name': column, 'type': dtype})
schema
# [{'name': 'id', 'type': 'int64'},
# {'name': 'name', 'type': 'string'},
# {'name': 'age', 'type': 'int64'},
# {'name': 'favorit', 'type': 'string'}]
Update to temporary table
df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)
Create and execute SQL for update
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
I was able to update here without any problems.
However, if you look at the site below, it says that you should move from pandas-gbq
to google-cloud-bigquery
.
https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja
So, next, I described the method using google-cloud-bigquery
.
④ Create / update table with google-cloud-bigquery
Install pyarrow
#Install pyarrow
!pip install pyarrow
table_id = f'{project_id}.{dateset}.{table_name}'
df = pd.DataFrame(values[3:], columns=values[1])
#Blank deletion, blank processing
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
Creating schema and changing data type of DataFrame
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column,dtype))
if dtype != 'string':
df[column] = df[column].astype(dtype)
schema
google-cloud-bigquery is different from pandas-gbq If the data types of the table and DataFrame do not match, an error will occur. The official setting is finer ...
You can check if the data type has changed with df.dtpyes.
dtypes
df.dtypes
# id int64
# name object
# age int64
# favorit object
# dtype: object
Next, set the settings to be performed with load_table_from_dataframe ()
to be executed later.
config
means config
in Japanese.
Below is a list of attributes that can be set. https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html
This time, I will set it to overwrite when there is an existing table with schema
and write_disposition
.
jon settings
job_config = bigquery.LoadJobConfig(
schema = schema,
write_disposition = 'WRITE_TRUNCATE'
)
Upload df data
job = client.load_table_from_dataframe(
df, f'{table_id}_temp', job_config=job_config
)
# Wait for the load job to complete.
job.result()
The following is the same code as ①, so it is summarized.
Create and execute SQL for update
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
I feel that ③ is better because it is easier to specify the data type. Is ④ moving faster?
Another thing I'm worried about is when the amount of data increases. Since the insert limit in google-cloud-bigquery was 10,000 records If you want to do more than that, you may need to do each processing.
In that case, replace the table only the first time. From the second time onward, there seems to be no problem if the process is irt.
Recommended Posts