--Data scientists usually analyze with Jupyter
--Therefore, there is a desire to process DB on Jupyter.
--Therefore, it is more convenient to use BigQuery on Jupyter via the library instead of WebUI or REST API.
--I decided to investigate the function of the official library google.cloud.bigquery
to realize what I wrote above.
--The following is a summary of how to create a table in BigQuery.
from google.cloud import bigquery
#Specify your own GCP Project ID
project_id = 'YourProjectID'
client = bigquery.Client(project=project_id)
If you don't know how to authenticate with Colaboratory, I wrote an article earlier so please refer to it.
If you run it in a GCE environment, authentication should pass by default in the first place.
If you want to access it in other environment, create and load JSON for authentication according to the official API reference below.
Three ways to access BigQuery with Colaboratory Official API Reference
google.cloud.bigquery
: Ver. 1.20.0
Needless to say, please install it like this
pip install google-cloud-bigquery==1.20.0
DataSet is assumed to be created in the US region
** If the PJ already has a DataSet, you can skip this part completely **. Even if you don't have a DataSet yet, it's basically not possible to recreate the DataSet in various ways, so once you've processed it, you can forget about the functions around here.
By the way, BigQuery DataSet corresponds to the "schema" in other DBs. But in BQ, the schema is given a different meaning, so DataSet is not called the schema </ font> here.
Create Dataset
# [demo]Create a DataSet with the name
dataset_name = "demo"
dataset_id = "{}.{}".format(client.project, dataset_name)
dataset = bigquery.Dataset(dataset_id)
#The location is the cheapest in the US so I always use this.If you are particular about the region, please change it.
dataset.location = "US"
client.create_dataset(dataset)
About the process of creating a table and loading data into the table
--Create table --Check the table --Data Import to table --Data Export from table --Delete table
Basically, if you read the official reference, it's all written, but well. .. .. Yup. .. .. You can write it in Qiita. .. ..
-reference- Managing Tables Use data definition language statements (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language?hl=ja)
Here, we will describe a code example assuming that a purchase history table for the following products will be created.
# | Column name | Mold | mode | comment |
---|---|---|---|---|
1 | TRANSACTION_ID | STRING | REQUIRED | Purchase history ID |
2 | ORDER_TS | TIMESTAMP | REQUIRED | Purchase time |
3 | ORDER_DT | DATE | REQUIRED | Purchase date |
4 | ITEM_CODE | STRING | REQUIRED | Product ID |
5 | ITEM_NAME | STRING | NULLABLE | Product name |
6 | QUANTITY | INTEGER | NULLABLE | Purchase quantity |
7 | AMOUNT | FLOAT | NULLABLE | Purchase price |
8 | DISCOUNT | FLOAT | NULLABLE | Discount amount |
9 | CUSTOMER_ID | STRING | REQUIRED | Customer ID |
10 | ITEM_TAG | RECORD | REPEATED | Product tag list |
10.1 | TAG_ID | STRING | NULLABLE | Tag ID |
10.2 | TAG_NAME | STRING | NULLABLE | Tag name |
BigQuery reads the table definition as a schema
Therefore, various definitions will be inserted into the method of bigquery.SchemaField
.
Field name and type cannot be omitted
Tag information is defined in a nested format
from google.cloud import bigquery
#Define Schema
schema = [
bigquery.SchemaField('TRANSACTION_ID', 'STRING', mode='REQUIRED', description='Purchase history ID'),
bigquery.SchemaField('ORDER_TS', 'TIMESTAMP', mode='REQUIRED', description='Purchase time'),
bigquery.SchemaField('ORDER_DT', 'DATE', mode='REQUIRED', description='Purchase date'),
bigquery.SchemaField('ITEM_CODE', 'STRING', mode='REQUIRED', description='Product ID'),
bigquery.SchemaField('ITEM_NAME', 'STRING', mode='NULLABLE', description='Product name'),
bigquery.SchemaField('QUANTITY', 'INTEGER', mode='NULLABLE', description='Purchase quantity'),
bigquery.SchemaField('AMOUNT', 'FLOAT', mode='NULLABLE', description='Purchase price'),
bigquery.SchemaField('DISCOUNT', 'FLOAT', mode='NULLABLE', description='Discount amount'),
bigquery.SchemaField('CUSTOMER_ID', 'STRING', mode='NULLABLE', description='Customer ID'),
bigquery.SchemaField('ITEM_TAG', 'RECORD', mode='REPEATED', description='Tag information',
fields= [
bigquery.SchemaField('TAG_ID', 'STRING', mode='NULLABLE', description='Tag ID'),
bigquery.SchemaField('TAG_NAME', 'STRING', mode='NULLABLE', description='Tag name'),
]
)
]
After creating the schema, the next step is to actually create the table
Other factors to consider besides the schema
--Use a split table (If you plan to store data with dates over 2000 days, it is better not to use a split table) --Make it a cluster table (note that it can only be applied to split tables)
What I was addicted to when creating a split table with BigQuery
This time we will create a table as a split & cluster table
Create table
from google.cloud import bigquery
# project_id = 'YourProjectID'
# client = bigquery.Client(project=project_id)
# dataset_name = "demo"
# dataset_id = "{}.{}".format(client.project, dataset_name)
#Decide on a table name
table_name = "demo_transaction"
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)
#Use the schema defined above
table = bigquery.Table(table_id, schema=schema)
#Split table settings(Here ORDER_DT)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="ORDER_DT"
)
#Cluster table settings
table.clustering_fields = ["ITEM_CODE", "CUSTOMER_ID"]
table.description = "Demo Data"
#Execute table creation
table = client.create_table(table)
If you check it on the console on the web, you can see that it is defined like this
To check the list of tables, either confirm the DataSet name or specify the DataSet object. Can be confirmed at
# [demo]Check the table in the DataSet
#Pattern to check the table name by specifying the DataSet name
dataset_id = "demo"
for table in client.list_tables(dataset=dataset_id):
print(table.table_id)
#Pattern to specify and confirm DataSet object
dataset_object = client.get_dataset("demo")
for table in client.list_tables(dataset=dataset_object):
print(table.table_id)
Data scientists frequently import / export data, so I want to understand this as well.
--Import local file --Read CSV --Read JSON --Import GCS file
I will describe the pattern that stores the file in CSV and the two patterns that store it in JSON.
I think it's common to store data in CSV files, so I'll follow this pattern. However, CSV cannot be used because nested information cannot be expressed in CSV in a table with nested fields.
Try loading a table name that does not contain nested information as demo_transaciton
Import csv
#Specify a local file
filename = 'demo_transaction.csv'
#Data set name
detaset_id = "demo"
#Table name to be imported
table_id = "demo_transaction_csv"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
#Settings for importing
job_config = bigquery.LoadJobConfig()
#Specify that CSV is the source
job_config.source_format = bigquery.SourceFormat.CSV
#Skip the first line if the file contains headers
job_config.skip_leading_rows = 1
with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
#Run
job.result()
By the way, if an error occurs for some reason, check the contents of the error with job.errors
and reload it.
Import table with nested data with json
The format that can be imported by json is decided, and it is necessary to contain data in the form of judging one record by line break as shown below
json format
{"TRANSACTION_ID":"t0001","ORDER_TS":"2019-11-02 12:00:00 UTC","ORDER_DT":"2019-11-02","ITEM_CODE":"ITEM001","ITEM_NAME":"YYYYY1","QUANTITY":"29","AMOUNT":2200,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[{"TAG_ID":"XXX1", "TAG_NAME":"XYZ1"},{"TAG_ID":"XXX2", "TAG_NAME":"XYZ2"}]}
{"TRANSACTION_ID":"t0002","ORDER_TS":"2019-11-03 12:00:00 UTC","ORDER_DT":"2019-11-03","ITEM_CODE":"ITEM002","ITEM_NAME":"YYYYY2","QUANTITY":"35","AMOUNT":5700,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[]}
{"TRANSACTION_ID":"t0003","ORDER_TS":"2019-11-04 12:00:00 UTC","ORDER_DT":"2019-11-04","ITEM_CODE":"ITEM003","ITEM_NAME":"YYYYY3","QUANTITY":"48","AMOUNT":4200,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[{"TAG_ID":"XXX3", "TAG_NAME":"XYZ3"}]}
If there is jsonized data in such a state, the local file can be imported like this
Import json
#Specify a local file name
filename = 'demo_transaction.json'
#Data set name
detaset_id = "demo"
#Table name with nested information
table_id = "demo_transaction"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
#I will tell you that json is the original file
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
#Run
job.result()
By the way, since it is nested data, it looks like this when viewed on the console
-reference- Gist: Try Google BigQuery JSON input lightly
There are times when you want to import data into BigQuery as a local file, but since you are using GCP, you should make full use of GCS.
So, check how to import the data in GCS
It's nice that you don't have to call GCS related libraries as long as you know the path of the files stored in GCS
An example assuming that the CSV file is imported into the demo_transaction_csv
table ↓
Load from GCS
#Specify dataset and table name
detaset_id = "demo"
table_id = "demo_transaction_csv"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
#Since CSV is loaded, various settings
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
#Specify the path where the GCS file is located
uri = "gs://{yourbacketname}/demo_transaction.csv"
#Generate job
load_job = client.load_table_from_uri(
uri, table_ref, job_config=job_config
)
#Run load
load_job.result()
Although it is not an official API function, you can also insert the data of pd.DataFrame
into the BigQuery table using the function on the pandas side.
It is possible to insert it additionally into the existing table, but I feel that it is often used to export the DataFrame after various processing as a new table.
As an example, pull out a part of the data of demo.demo_transaction_csv
created earlier and write it out as another table.
to_Export DataFrame with gbq
#Prepare a query to get a part of the data
query = """
SELECT
TRANSACTION_ID
, ORDER_TS
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 200
;
"""
#Generate query job
query_job = client.query(
query, location='US'
)
#Receive results in dataframe
df = query_job.to_dataframe()
#Data frame[demo_transaciton_csv_extracted]Export with the name of
# if_exists:append ->If there is already a table, add it, if not, create a new one
# if_exists:fail ->Fail if there is already a table, create a new one if not
# if_exists:replace ->If there is already a table, replace it, if not, create a new one
detaset_id = "demo"
table_id = "demo_transaciton_csv_extracted"
df.to_gbq(destination_table='{dataset}.{table}'.format(dataset=dataset_id, table=table_id),project_id=project_id, if_exists='append')
Make sure that Import is working
Check if the table contains data
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
table_id = "demo_transaciton_csv_extracted"
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
print("Table has {} rows".format(table.num_rows))
> Table has 200 rows
I wrote the evil method first, but it is also possible to insert a DataFrame with the API
Schema can be executed without defining Schema in the sample code
pd.Import DataFrame
import pandas as pd
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
table_id = "demo_pandas_data"
table_ref = dataset_ref.table(table_id)
#Appropriately pd.Create DataFrame data
rows = [
{"item_id": "xx1", "quantity": 1},
{"item_id": "xx2", "quantity": 2},
{"item_id": "xx3", "quantity": 3},
]
dataframe = pd.DataFrame(
rows,
columns=["item_id", "quantity"]
)
#Define schema(You can import without it, but it is safer to write it)
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("item_id", "STRING"),
bigquery.SchemaField("quantity", "INTEGER"),
],
)
# pd.Store DataFrame data in a table
job = client.load_table_from_dataframe(
dataframe,
table_ref,
job_config=job_config,
location="US",
)
#Run
job.result()
Evilly pulling data from an existing table via DataFrame and writing it as a new table, but basically I want to implement it with official functions
--Extract the information of the existing table using the API function and write a new one --Write a new one directly in the query
When writing using the API function, just specify a new table name in QueryJobConfig.destination
.
**simple! !! !! ** **
Write the query result by specifying the write table
#Generate job config
job_config = bigquery.QueryJobConfig()
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
#Define the table name to write to
table_id = "demo_transaciton_csv_direct_extracted"
table_ref = dataset_ref.table(table_id)
# (important)Specify the table to write to
job_config.destination = table_ref
#Define the query to write to
query = """
SELECT
TRANSACTION_ID
, ORDER_TS
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 300
;
"""
#Generate query job
query_job = client.query(
query,
location="US",
job_config=job_config,
)
#Run
query_job.result()
I feel that the pattern of defining a new table with QueryJobConfig.destination
is sufficient, but I also follow the familiar method (CREATE TABLE ~ AS SELECT).
After all, I use it unexpectedly. .. ..
Create a new table with a query
detaset_id = "demo"
#Define the table name to write to
table_id = "demo_transaciton_csv_as_select"
#Define the query to write to
query = """
DROP TABLE IF EXISTS {dataset}.{table} ;
CREATE TABLE {dataset}.{table} AS
SELECT
TRANSACTION_ID
, ORDER_TS
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 400
;
""".format(dataset=dataset_id, table=table_id)
#Generate query job
job_config = bigquery.QueryJobConfig()
query_job = client.query(
query,
location="US",
job_config=job_config,
)
#Run(Of course nothing is returned but it is written properly)
query_job.result()
This should cover all the ways to import data. .. ..
BigQuery is a column-based pay-as-you-go format
Limit
, the billing amount does not changeWhere
There is a service feature called
It doesn't matter as long as the data volume is small (1TB per month is free for queries), but when dealing with data of several tens of TB or more, you need to be careful.
Then what should I do?
Is the basic coping method
A table with several TB data should always contain some time series information, so set that information as the field to be split and create a split table.
Note that you cannot change it later unless you define it as a split table when creating the table.
-reference- Overview of partitioned tables
First, describe the pattern to set the split option at the stage of creating the table.
Set split options
#Describe table definition (time series column is required)
schema = [
bigquery.SchemaField('TRANSACTION_ID', 'STRING', mode='REQUIRED', description='Purchase history ID'),
bigquery.SchemaField('ORDER_TS', 'TIMESTAMP', mode='REQUIRED', description='Purchase time'),
bigquery.SchemaField('ORDER_DT', 'DATE', mode='REQUIRED', description='Purchase date'),
]
detaset_id = "demo"
table_id = "demo_transaction_time_partition1"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
#Table object creation
table = bigquery.Table(table_ref, schema=schema)
#Set split options
table.time_partitioning = bigquery.TimePartitioning(
#Divided by day
type_=bigquery.TimePartitioningType.DAY,
#Set the target field
field="ORDER_DT"
)
table.description = "Time Partition Data"
#Create a split table
table = client.create_table(table)
You can also create a split table from an existing table with CREATE TABLE [TABLE_NAME] AS SELECT
The best use is when ** recreating a bloated table that has not been set as a split table **
ʻAS SELECT is preceded by
PARTITION BY [Time Partition Field] `
Table creation query with split option
detaset_id = "demo"
#Define the table name to write to
table_id = "demo_transaciton_csv_as_select_time_partition"
query = """
DROP TABLE IF EXISTS {dataset}.{table} ;
CREATE TABLE {dataset}.{table}
PARTITION BY
ORDER_DT
AS
SELECT
TRANSACTION_ID
, ORDER_TS
, ORDER_DT
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 500
;
""".format(dataset=dataset_id, table=table_id)
#Generate query job
query_job = client.query(
query,
location="US"
)
#Run
query_job.result()
Easy! !!
You can set more cluster fields in the split table
Only the cluster field is specified in the option of the split table, so it is excerpted and described.
-reference- Creating and using clustered tables (https://cloud.google.com/bigquery/docs/creating-clustered-tables?hl=ja)
Please refer to the following for the effect of setting the cluster option.
[BigQuery] Clustered Table Survey
Specify additional cluster fields
"""Cluster table must be a split table
table = bigquery.Table(table_ref, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="ORDER_DT"
)
"""
table.clustering_fields = ["ITEM_CODE", "CUSTOMER_ID"]
Even when specifying in SQL, just add the cluster option CLUSTER BY
Excerpt from the cluster option addition part
query =
"""
DROP TABLE IF EXISTS {dataset}.{table} ;
CREATE TABLE {dataset}.{table}
PARTITION BY
ORDER_DT
CLUSTER BY
ITEM_CODE, CUSTOMER_ID
AS
SELECT
*
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 500
;
""".format(dataset=dataset_id, table=table_id)
Huh. .. .. Finally, the part to put the data is over. .. ..
Next is the Export part, but the method of exporting the table itself is basically to export to GCS
Export the contents of the table by specifying the GCS bucket
If you do not specify job_config
, it will be written as a csv file.
Normally it will be csv, so tables containing nested columns cannot be exported with csv
Export in csv format
#Specify the table to be exported
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
table_id = "demo_transaciton_csv"
table_ref = dataset_ref.table(table_id)
#Store files in specified bucket
bucket_name = "{Your Bucket Name}"
output_name = "{}.csv".format(table_id)
destination_uri = "gs://{}/{}".format(bucket_name, output_name)
#Generate export job
extract_job = client.extract_table(
table_ref,
destination_uri,
location="US",
)
#Run
extract_job.result()
If you export the table as it is, the amount of data will be large as it is, so I want to set the compression option
ʻExtractJobConfig` to set output options and compress
You can control whether to write the header by setting the print_header
option (default is True)
Added compression option(Compress with gzip)
destination_uri = "gs://{YourBucket}/{filename}.gz"
job_config = bigquery.ExtractJobConfig(
compression="GZIP",
print_header=True
)
#Generate export job
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
location="US",
)
#Run
extract_job.result()
If there is a nested column, it cannot be exported with csv, so export it with json or avro
Compressable with json, but avro does not support compression options
Nested table in json or Avro
output_name = "{}.json".format(table_id)
destination_uri = "gs://{}/{}".format(bucket_name, output_name)
#Export with json(Do not output header)
job_config = bigquery.ExtractJobConfig(
destination_format = "NEWLINE_DELIMITED_JSON",
print_header = False
)
#Run
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
)
extract_job.result()
By the way, the default is csv, but you can also export with tsv
Set tsv option
# job_Add delimiter option to config
job_config = bigquery.ExtractJobConfig(
field_delimiter="\t"
)
If you want to delete the table, just specify the table name
Delete table
# from google.cloud import bigquery
# project_id = 'YourProjectID'
# client = bigquery.Client(project=project_id)
detaset_id = "{YourDataSetId}"
dataset_ref = client.dataset(dataset_id)
table_id = "{YourTableId}"
table_ref = dataset_ref.table(table_id)
#Delete table
client.delete_table(table_ref)
This is the end of the story around table creation
Recommended Posts