Manipulate BigQuery tables from a Python client

Advance preparation

1. Obtaining credentials

Create a service account to use the BigQuery API in advance and download the credentials (JSON). https://cloud.google.com/docs/authentication/getting-started?hl=ja

2. Installation of required packages

Pipfile


[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"

[requires]
python_version = "3.8"

Recreate the table (Drop⇒Create)

1. Client initialization

migrate_table.py


import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

For [PATH TO CREDENTIAL], specify the JSON path of the credential.

2. Regenerate the table

migrate_table.py


table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

#Delete existing table
client.delete_table(table, not_found_ok=True)

#Creating a table
schema = [
    bigquery.SchemaField('id', 'INT64'),
    bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))

For [DATASET NAME] and [TABLE NAME], specify the data set name and table name to create.

If the not_found_ok flag is False when deleting an existing table, an error will occur if the table does not exist, so leave it as True. An image like DROP TABLE IF EXISTS in DDL.

When creating a table, you need to specify the column name and type as the schema definition. By default, it is a Nullable column, so if you want to make it a Not Null column, specify the mode.

bigquery.SchemaField('id', 'INT64', mode='REQUIRED')

will do.

3. Import data

Import CSV data as initial data. Prepare the data according to the schema definition type in CSV in advance.

import.csv


id, name
1, hogehoge
2, fugafuga

migrate_table.py


#Import initial data
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
    job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()

If you want to skip the CSV header, you can specify the number of skipped rows with skip_leading_rows.

View regeneration

migrate_view.py


import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)

In the case of a view, the flow is almost the same as that of an entity table. The difference is that in the case of a view, specify the SQL query directly in view_query. The schema definition is automatically constructed from SQL queries and does not need to be specified.

Schedule update

When you want to change the execution query and scheduling settings in a scene where data is regularly refreshed using Scheduled Query.

migrate_schedule.py



import json

from google.cloud import bigquery
from google.oauth2 import service_account

from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery_datatransfer_v1.DataTransferServiceClient(
    credentials=credentials
)

config = google.protobuf.json_format.ParseDict(
    {
        "name": '[RESOURCE NAME]',
        "destination_dataset_id": '[DATASET NAME]',
        "display_name": '[DISPLAY NAME]',
        "data_source_id": "scheduled_query",
        "params": {
            "query": "SELECT * FROM dataset_name.table_name",
            "destination_table_name_template": '[TABLE NAME]',
            "write_disposition": "WRITE_TRUNCATE",
            "partitioning_field": "",
        },
        "schedule": "every 24 hours",
    },
    bigquery_datatransfer_v1.types.TransferConfig(),
)

update_mask = {"paths": ["display_name", "params", "schedule"]}

response = client.update_transfer_config(
    config, update_mask 
) 

I will pass the schedule settings (config) and update mask (ʻupdate_mask) to ʻupdate_transfer_config. ʻUpdate_mask specifies which field to update in config. Settings that are not included in ʻupdate_mask will not be updated.

The details of the setting value of config are as follows.

name Specify the resource name of the schedule. The resource name is displayed in a list of schedules from "Scheduled Query" in the BigQuery console, so you can check it from the configuration information of the corresponding schedule. bigquery.png

destination_dataset_id Specify the name of the data set to save the query results executed in the schedule.

display_name Since it is the display name of the schedule, give it any name.

params.query Specifies the query to execute.

params.destination_table_name_template Specify the name of the table where the query results executed in the schedule are saved.

params.write_disposition Specifies how to save to the table. If you specify WRITE_TRUNCATE, the existing table is replaced with the result of the execute query. If WRITE_APPEND is specified, the result of the execute query will be added to the existing table.

schedule Specify the execution timing of the schedule. ex. Run every hour ... every 1 hour Run at midnight every day ... every day 00:00

in conclusion

When using BigQuery as a data mart, it is difficult to manage changes if you create a table from the console, so it is recommended that you can manage it with git if you drop it in the code.

Recommended Posts

Manipulate BigQuery tables from a Python client
Manipulate riak from python
How to launch AWS Batch from a python client app
Call a Python function from p5.js.
Touch a Python object from Elixir
python / Make a dict from a list.
A simple HTTP client implemented in Python
Send a message from Python to Slack
Dump BigQuery tables to GCS using Python
"First Elasticsearch" starting with a python client
Call a command from Python (Windows version)
Send a message from Slack to a Python server
Run a Python script from a C # GUI application
Edit Excel from Python to create a PivotTable
How to open a web browser from python
Create a C array from a Python> Excel sheet
[Python] Road to a snake charmer (6) Manipulate Pandas
A memorandum of calling Python from Common Lisp
Create a New Todoist Task from Python Script
How to generate a Python object from JSON
sql from python
"Python Kit" that calls a Python script from Swift
MeCab from Python
EC2 (Python3)-> BigQuery
Create a decision tree from 0 with Python (1. Overview)
Call a Python script from Embedded Python in C ++ / C ++
Create a datetime object from a string in Python (Python 3.3)
Run a Python file from html using Django
Read line by line from a file with Python
Run a python script from excel (using xlwings)
Extract data from a web page with Python
A python client application that downloads and deletes files from S3 by specifying a bucket
BigQuery-Python was useful when working with BigQuery from Python
Make a copy of a Google Drive file from Python
Receive dictionary data from a Python program in AppleScript
Manipulate excel files from python with xlrd (personal notes)
I tried running python etc. from a bat file
Python points from the perspective of a C programmer
From a book that programmers can learn ... (Python): Pointer
Steps from installing Python 3 to creating a Django app
From buying a computer to running a program with python
Consider a conversion from a Python recursive function to a non-recursive function
Python script to create a JSON file from a CSV file
[Python] How to call a c function from python (ctypes)
How to create a kubernetes pod from python code
A little bit from Python using the Jenkins API
[Python] Start a batch file from Python and pass variables.
Use thingsspeak from python
A * algorithm (Python edition)
Touch MySQL from Python 3
[Python] Take a screenshot
Operate Filemaker from Python
Use fluentd from python
Python StatsD client configuration
Create a Python module
Access bitcoind from python
Changes from Python 3.0 to Python 3.5
Changes from Python 2 to Python 3.0
Python from or import
Use MySQL from Python
Run python from excel