google.cloud.bigquery
pour réaliser ce que j'ai écrit ci-dessus.from google.cloud import bigquery
#Spécifiez votre propre ID de projet GCP
project_id = 'YourProjectID'
client = bigquery.Client(project=project_id)
Si vous ne savez pas comment passer la certification dans Colaboratory, veuillez vous référer à l'article que j'ai écrit plus tôt.
Si vous l'exécutez dans un environnement GCE, l'authentification doit passer par défaut en premier lieu.
Si vous souhaitez y accéder dans un autre environnement, créez et chargez le JSON d'authentification selon la référence API officielle ci-dessous.
Trois façons d'accéder à BigQuery avec Colaboratory Référence officielle de l'API
google.cloud.bigquery
: Ver. 1.20.0
Inutile de dire, veuillez l'installer comme ça
pip install google-cloud-bigquery==1.20.0
DataSet est supposé être créé dans la région des États-Unis
** Si le PJ a déjà un DataSet, vous pouvez ignorer complètement cette partie **. Même si vous n'avez pas encore de DataSet, il n'est fondamentalement pas possible de recréer le DataSet de différentes manières, donc une fois que vous l'avez traité, vous pouvez oublier les fonctions ici.
Soit dit en passant, le DataSet de BigQuery correspond au «schéma» des autres bases de données. Mais dans BQ, le schéma a une signification différente, donc DataSet n'est pas appelé un schéma </ font> ici.
Créer un jeu de données
# [demo]Créer un DataSet avec le nom
dataset_name = "demo"
dataset_id = "{}.{}".format(client.project, dataset_name)
dataset = bigquery.Dataset(dataset_id)
#Les États-Unis sont l'endroit le moins cher, je l'utilise donc toujours.Si vous êtes particulier sur la région, veuillez la modifier
dataset.location = "US"
client.create_dataset(dataset)
À propos du processus de création d'une table et de chargement de données dans la table
--Créer une table --Vérifiez la table
En gros, si vous lisez la référence officielle, tout est écrit, mais bon. .. .. Ouaip. .. .. Vous pouvez l'écrire en Qiita. .. ..
-référence- Managing Tables Utilisez les instructions du langage de définition de données (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language?hl=ja)
Ici, nous allons décrire un exemple de code en supposant que nous allons créer une table d'historique des achats pour les produits suivants.
# | Nom de colonne | Moule | mode | commentaire |
---|---|---|---|---|
1 | TRANSACTION_ID | STRING | REQUIRED | ID de l'historique des achats |
2 | ORDER_TS | TIMESTAMP | REQUIRED | Temps d'achat |
3 | ORDER_DT | DATE | REQUIRED | Date d'achat |
4 | ITEM_CODE | STRING | REQUIRED | ID produit |
5 | ITEM_NAME | STRING | NULLABLE | Nom du produit |
6 | QUANTITY | INTEGER | NULLABLE | Quantité d'achat |
7 | AMOUNT | FLOAT | NULLABLE | Prix d'achat |
8 | DISCOUNT | FLOAT | NULLABLE | Montant de la remise |
9 | CUSTOMER_ID | STRING | REQUIRED | N ° de client |
10 | ITEM_TAG | RECORD | REPEATED | Liste d'étiquettes de produit |
10.1 | TAG_ID | STRING | NULLABLE | ID de balise |
10.2 | TAG_NAME | STRING | NULLABLE | Nom de la balise |
BigQuery lit la définition d'une table en tant que schéma
Par conséquent, diverses définitions seront insérées dans la méthode de bigquery.SchemaField
.
Le nom et le type de champ ne peuvent pas être omis
Les informations de balise sont définies dans un format imbriqué
from google.cloud import bigquery
#Définir le schéma
schema = [
bigquery.SchemaField('TRANSACTION_ID', 'STRING', mode='REQUIRED', description='ID de l'historique des achats'),
bigquery.SchemaField('ORDER_TS', 'TIMESTAMP', mode='REQUIRED', description='Temps d'achat'),
bigquery.SchemaField('ORDER_DT', 'DATE', mode='REQUIRED', description='Date d'achat'),
bigquery.SchemaField('ITEM_CODE', 'STRING', mode='REQUIRED', description='ID produit'),
bigquery.SchemaField('ITEM_NAME', 'STRING', mode='NULLABLE', description='Nom du produit'),
bigquery.SchemaField('QUANTITY', 'INTEGER', mode='NULLABLE', description='Quantité d'achat'),
bigquery.SchemaField('AMOUNT', 'FLOAT', mode='NULLABLE', description='Prix d'achat'),
bigquery.SchemaField('DISCOUNT', 'FLOAT', mode='NULLABLE', description='Montant de la remise'),
bigquery.SchemaField('CUSTOMER_ID', 'STRING', mode='NULLABLE', description='N ° de client'),
bigquery.SchemaField('ITEM_TAG', 'RECORD', mode='REPEATED', description='Informations sur les balises',
fields= [
bigquery.SchemaField('TAG_ID', 'STRING', mode='NULLABLE', description='ID de balise'),
bigquery.SchemaField('TAG_NAME', 'STRING', mode='NULLABLE', description='Nom de la balise'),
]
)
]
Après avoir créé le schéma, l'étape suivante consiste à créer réellement la table
Autres facteurs à considérer en plus du schéma
--Split table (Si vous prévoyez de mettre des données sur 2000 jours, il est préférable de ne pas fractionner la table)
Ce à quoi j'étais accro lors de la création d'une table fractionnée avec BigQuery
Cette fois, nous allons créer une table en tant que table fractionnée et cluster
Créer une table
from google.cloud import bigquery
# project_id = 'YourProjectID'
# client = bigquery.Client(project=project_id)
# dataset_name = "demo"
# dataset_id = "{}.{}".format(client.project, dataset_name)
#Choisissez un nom de table
table_name = "demo_transaction"
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)
#Utilisez le schéma défini ci-dessus
table = bigquery.Table(table_id, schema=schema)
#Paramètres de table fractionnée(Ici COMMANDER_DT)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="ORDER_DT"
)
#Paramètres de la table de cluster
table.clustering_fields = ["ITEM_CODE", "CUSTOMER_ID"]
table.description = "Demo Data"
#Exécuter la création de table
table = client.create_table(table)
Si vous le vérifiez sur la console sur le Web, vous pouvez voir qu'il est défini comme ceci
Confirmer le nom du DataSet ou spécifier l'objet DataSet pour vérifier la liste des tables Peut être confirmé à
# [demo]Vérifiez le tableau dans le DataSet
#Modèle pour vérifier le nom de la table en spécifiant le nom du DataSet
dataset_id = "demo"
for table in client.list_tables(dataset=dataset_id):
print(table.table_id)
#Modèle à vérifier en spécifiant un objet DataSet
dataset_object = client.get_dataset("demo")
for table in client.list_tables(dataset=dataset_object):
print(table.table_id)
Les data scientists importent / exportent fréquemment des données, donc je veux également comprendre cela.
--Importer un fichier local --Lire CSV --Lire JSON --Importer un fichier GCS
Décrivez le modèle qui stocke le fichier au format CSV et les deux modèles qui le stockent en JSON.
Je pense qu'il est courant de stocker des données dans des fichiers CSV, je vais donc suivre ce modèle. Cependant, CSV ne peut pas être utilisé car les informations imbriquées ne peuvent pas être exprimées en CSV dans une table avec des champs imbriqués.
Essayez de charger un nom de table qui ne contient pas d'informations imbriquées en tant que demo_transaciton
Importer csv
#Spécifiez le fichier local
filename = 'demo_transaction.csv'
#Nom du jeu de données
detaset_id = "demo"
#Nom de la table à importer
table_id = "demo_transaction_csv"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
#Paramètres d'importation
job_config = bigquery.LoadJobConfig()
#Spécifiez que CSV est la source
job_config.source_format = bigquery.SourceFormat.CSV
#Ignorez la première ligne si le fichier contient des en-têtes
job_config.skip_leading_rows = 1
with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
#Courir
job.result()
Au fait, si une erreur se produit pour une raison quelconque, vérifiez le contenu de l'erreur avec job.errors
et rechargez-la.
Importer une table avec des données imbriquées avec json
Le format qui peut être importé par json est fixe, et il est nécessaire de contenir des données sous la forme de juger un enregistrement par saut de ligne comme indiqué ci-dessous
format json
{"TRANSACTION_ID":"t0001","ORDER_TS":"2019-11-02 12:00:00 UTC","ORDER_DT":"2019-11-02","ITEM_CODE":"ITEM001","ITEM_NAME":"YYYYY1","QUANTITY":"29","AMOUNT":2200,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[{"TAG_ID":"XXX1", "TAG_NAME":"XYZ1"},{"TAG_ID":"XXX2", "TAG_NAME":"XYZ2"}]}
{"TRANSACTION_ID":"t0002","ORDER_TS":"2019-11-03 12:00:00 UTC","ORDER_DT":"2019-11-03","ITEM_CODE":"ITEM002","ITEM_NAME":"YYYYY2","QUANTITY":"35","AMOUNT":5700,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[]}
{"TRANSACTION_ID":"t0003","ORDER_TS":"2019-11-04 12:00:00 UTC","ORDER_DT":"2019-11-04","ITEM_CODE":"ITEM003","ITEM_NAME":"YYYYY3","QUANTITY":"48","AMOUNT":4200,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[{"TAG_ID":"XXX3", "TAG_NAME":"XYZ3"}]}
S'il y a des données jsonisées dans un tel état, le fichier local peut être importé comme ceci
Importer json
#Spécifiez le nom du fichier local
filename = 'demo_transaction.json'
#Nom du jeu de données
detaset_id = "demo"
#Nom de table avec informations imbriquées
table_id = "demo_transaction"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
#Je vais vous dire que json est le fichier original
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
#Courir
job.result()
À propos, puisqu'il s'agit de données imbriquées, cela ressemble à ceci une fois visualisé sur la console
-référence- Gist: essayez légèrement l'entrée JSON de Google BigQuery
Il arrive que des données soient importées dans BigQuery en tant que fichier local, mais étant donné que GCP est utilisé, il est préférable de tirer pleinement parti de GCS.
Alors, vérifiez comment importer les données dans GCS
C'est bien que vous n'ayez pas à appeler les bibliothèques liées à GCS tant que vous connaissez le chemin du fichier stocké dans GCS
Exemple en supposant que le fichier CSV est importé dans la table demo_transaction_csv
↓
Charger depuis GCS
#Spécifiez le jeu de données et le nom de la table
detaset_id = "demo"
table_id = "demo_transaction_csv"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
#Depuis le chargement du CSV, divers paramètres
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
#Spécifiez le chemin où se trouve le fichier GCS
uri = "gs://{yourbacketname}/demo_transaction.csv"
#Générer un travail
load_job = client.load_table_from_uri(
uri, table_ref, job_config=job_config
)
#Exécuter la charge
load_job.result()
Bien qu'il ne s'agisse pas d'une fonction API officielle, vous pouvez également insérer les données de pd.DataFrame
dans la table BigQuery à l'aide de la fonction côté pandas.
Il est possible de l'insérer en plus dans une table existante, mais j'estime qu'il est souvent utilisé pour exporter le DataFrame après divers traitements en tant que nouvelle table.
Par exemple, extrayez une partie des données de demo.demo_transaction_csv
créées précédemment et écrivez-la comme une autre table.
to_Exporter DataFrame avec gbq
#Préparez une requête pour obtenir une partie des données
query = """
SELECT
TRANSACTION_ID
, ORDER_TS
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 200
;
"""
#Générer une tâche de requête
query_job = client.query(
query, location='US'
)
#Recevoir les résultats dans une trame de données
df = query_job.to_dataframe()
#Trame de données[demo_transaciton_csv_extracted]Exporter avec le nom de
# if_exists:append ->S'il y a déjà une table, ajoutez-la, sinon créez-en une nouvelle
# if_exists:fail ->Échouer s'il y a déjà une table, créer une nouvelle sinon
# if_exists:replace ->S'il y a déjà une table, remplacez-la, sinon créez-en une nouvelle
detaset_id = "demo"
table_id = "demo_transaciton_csv_extracted"
df.to_gbq(destination_table='{dataset}.{table}'.format(dataset=dataset_id, table=table_id),project_id=project_id, if_exists='append')
Assurez-vous que l'importation fonctionne
Vérifiez si le tableau contient des données
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
table_id = "demo_transaciton_csv_extracted"
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
print("Table has {} rows".format(table.num_rows))
> Table has 200 rows
J'ai d'abord écrit la méthode diabolique, mais il est également possible d'insérer un DataFrame avec l'API
Peut être exécuté sans définir de schéma dans l'exemple de code
pd.Importer DataFrame
import pandas as pd
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
table_id = "demo_pandas_data"
table_ref = dataset_ref.table(table_id)
#De manière appropriée pd.Créer des données DataFrame
rows = [
{"item_id": "xx1", "quantity": 1},
{"item_id": "xx2", "quantity": 2},
{"item_id": "xx3", "quantity": 3},
]
dataframe = pd.DataFrame(
rows,
columns=["item_id", "quantity"]
)
#Définir le schéma(Vous pouvez importer sans lui, mais il est plus sûr de l'écrire)
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("item_id", "STRING"),
bigquery.SchemaField("quantity", "INTEGER"),
],
)
# pd.Stocker les données DataFrame dans une table
job = client.load_table_from_dataframe(
dataframe,
table_ref,
job_config=job_config,
location="US",
)
#Courir
job.result()
Evilly extraire des données d'une table existante via DataFrame et les écrire en tant que nouvelle table, mais fondamentalement, je veux l'implémenter avec des fonctions officielles
Lors de l'écriture à l'aide de la fonction API, spécifiez simplement un nouveau nom de table dans QueryJobConfig.destination
.
**Facile! !! !! ** **
Écrire les résultats de la requête en spécifiant une table d'écriture
#Générer la configuration du travail
job_config = bigquery.QueryJobConfig()
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
#Définissez le nom de la table dans laquelle écrire
table_id = "demo_transaciton_csv_direct_extracted"
table_ref = dataset_ref.table(table_id)
# (important)Spécifiez la table de destination d'écriture
job_config.destination = table_ref
#Définir la requête dans laquelle écrire
query = """
SELECT
TRANSACTION_ID
, ORDER_TS
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 300
;
"""
#Générer une tâche de requête
query_job = client.query(
query,
location="US",
job_config=job_config,
)
#Courir
query_job.result()
Je pense que le modèle de définition d'une nouvelle table avec QueryJobConfig.destination
est suffisant, mais je suis également la méthode familière (CREATE TABLE ~ AS SELECT).
Après tout, je l'utilise de manière inattendue. .. ..
Créer une nouvelle table avec une requête
detaset_id = "demo"
#Définissez le nom de la table dans laquelle écrire
table_id = "demo_transaciton_csv_as_select"
#Définir la requête dans laquelle écrire
query = """
DROP TABLE IF EXISTS {dataset}.{table} ;
CREATE TABLE {dataset}.{table} AS
SELECT
TRANSACTION_ID
, ORDER_TS
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 400
;
""".format(dataset=dataset_id, table=table_id)
#Générer une tâche de requête
job_config = bigquery.QueryJobConfig()
query_job = client.query(
query,
location="US",
job_config=job_config,
)
#Courir(Bien sûr, rien n'est retourné mais c'est écrit correctement)
query_job.result()
Cela devrait couvrir toutes les manières d'importer des données. .. ..
BigQuery est un format de paiement à l'utilisation basé sur des colonnes
Il y a une fonctionnalité sur le service
Cela n'a pas d'importance tant que le volume de données est petit (1 To par mois est gratuit pour les requêtes), mais lorsque vous traitez des données de plusieurs dizaines de To ou plus, vous devez être prudent.
Alors que dois-je faire?
Est la méthode d'adaptation de base
Une table avec plusieurs To de données doit toujours contenir des informations de série chronologique, définissez donc ces informations comme champ à fractionner et créez une table fractionnée.
Notez que vous ne pouvez pas le modifier ultérieurement, sauf si vous le définissez comme table fractionnée lors de la création de la table.
-référence- Présentation des tables partitionnées
Tout d'abord, décrivez le modèle de définition de l'option de fractionnement au stade de la création de la table.
Définir les options de fractionnement
#Décrivez la définition de la table (la colonne de série chronologique est requise)
schema = [
bigquery.SchemaField('TRANSACTION_ID', 'STRING', mode='REQUIRED', description='ID de l'historique des achats'),
bigquery.SchemaField('ORDER_TS', 'TIMESTAMP', mode='REQUIRED', description='Temps d'achat'),
bigquery.SchemaField('ORDER_DT', 'DATE', mode='REQUIRED', description='Date d'achat'),
]
detaset_id = "demo"
table_id = "demo_transaction_time_partition1"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
#Création d'objets de table
table = bigquery.Table(table_ref, schema=schema)
#Définir les options de fractionnement
table.time_partitioning = bigquery.TimePartitioning(
#Divisé par jour
type_=bigquery.TimePartitioningType.DAY,
#Définir le champ cible
field="ORDER_DT"
)
table.description = "Time Partition Data"
#Créer une table fractionnée
table = client.create_table(table)
Une table fractionnée peut également être créée à partir d'une table existante avec CREATE TABLE [TABLE_NAME] AS SELECT
La meilleure utilisation consiste à ** recréer une table gonflée qui n'a pas été définie comme table fractionnée **
ʻAS SELECT est précédé de
PARTITION BY [Time Partition Field] `
Requête de création de table avec option de fractionnement
detaset_id = "demo"
#Définissez le nom de la table dans laquelle écrire
table_id = "demo_transaciton_csv_as_select_time_partition"
query = """
DROP TABLE IF EXISTS {dataset}.{table} ;
CREATE TABLE {dataset}.{table}
PARTITION BY
ORDER_DT
AS
SELECT
TRANSACTION_ID
, ORDER_TS
, ORDER_DT
, ITEM_CODE
, QUANTITY
, AMOUNT
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 500
;
""".format(dataset=dataset_id, table=table_id)
#Générer une tâche de requête
query_job = client.query(
query,
location="US"
)
#Courir
query_job.result()
Facile! !!
Vous pouvez définir plus de champs de cluster dans la table fractionnée
Spécifiez uniquement le champ de cluster comme une option de la table fractionnée, il est donc extrait et décrit.
-référence- Créer et utiliser des tables en cluster (https://cloud.google.com/bigquery/docs/creating-clustered-tables?hl=ja)
Veuillez vous référer à ce qui suit pour l'effet de la configuration de l'option de cluster.
[BigQuery] Clustered Table Survey
Spécifiez des champs de cluster supplémentaires
"""La table de cluster doit être une table fractionnée
table = bigquery.Table(table_ref, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="ORDER_DT"
)
"""
table.clustering_fields = ["ITEM_CODE", "CUSTOMER_ID"]
Ajoutez simplement l'option de cluster CLUSTER BY
même lorsque vous spécifiez dans SQL
Extrait de la partie addition d'option de cluster
query =
"""
DROP TABLE IF EXISTS {dataset}.{table} ;
CREATE TABLE {dataset}.{table}
PARTITION BY
ORDER_DT
CLUSTER BY
ITEM_CODE, CUSTOMER_ID
AS
SELECT
*
FROM
`{YourProjectID}.demo.demo_transaction_csv`
LIMIT 500
;
""".format(dataset=dataset_id, table=table_id)
Huh. .. .. Enfin, la partie pour mettre les données est terminée. .. ..
Vient ensuite la partie Export, mais la méthode d'exportation de la table elle-même consiste essentiellement à exporter vers GCS
Exportez le contenu de la table en spécifiant le bucket GCS
Si vous ne spécifiez pas job_config
, il sera écrit sous forme de fichier csv.
Normalement, c'est csv, donc les tables contenant des colonnes imbriquées ne peuvent pas être exportées avec csv
Exporter au format csv
#Spécifiez la table à exporter
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)
table_id = "demo_transaciton_csv"
table_ref = dataset_ref.table(table_id)
#Stocker le fichier dans le compartiment spécifié
bucket_name = "{Your Bucket Name}"
output_name = "{}.csv".format(table_id)
destination_uri = "gs://{}/{}".format(bucket_name, output_name)
#Générer une tâche d'exportation
extract_job = client.extract_table(
table_ref,
destination_uri,
location="US",
)
#Courir
extract_job.result()
Si vous exportez la table telle quelle, la quantité de données sera importante telle quelle, donc je souhaite définir l'option de compression
ʻExtractJobConfig` définit les options de sortie et compresse
Vous pouvez contrôler si vous souhaitez exporter l'en-tête en définissant l'option print_header
(la valeur par défaut est True)
Option de compression ajoutée(Compresser avec gzip)
destination_uri = "gs://{YourBucket}/{filename}.gz"
job_config = bigquery.ExtractJobConfig(
compression="GZIP",
print_header=True
)
#Générer une tâche d'exportation
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
location="US",
)
#Courir
extract_job.result()
S'il existe une colonne imbriquée, elle ne peut pas être exportée avec csv, alors exportez-la avec json ou avro.
json peut être compressé, mais avro ne prend pas en charge les options de compression
Tables imbriquées dans json ou Avro
output_name = "{}.json".format(table_id)
destination_uri = "gs://{}/{}".format(bucket_name, output_name)
#Exporter avec json(Pas de sortie d'en-tête)
job_config = bigquery.ExtractJobConfig(
destination_format = "NEWLINE_DELIMITED_JSON",
print_header = False
)
#Courir
extract_job = client.extract_table(
table_ref,
destination_uri,
job_config=job_config,
)
extract_job.result()
Au fait, la valeur par défaut est csv, mais vous pouvez également exporter avec tsv
Définir l'option tsv
# job_Ajouter l'option delimita à la configuration
job_config = bigquery.ExtractJobConfig(
field_delimiter="\t"
)
Si vous souhaitez supprimer la table, spécifiez simplement le nom de la table
Supprimer la table
# from google.cloud import bigquery
# project_id = 'YourProjectID'
# client = bigquery.Client(project=project_id)
detaset_id = "{YourDataSetId}"
dataset_ref = client.dataset(dataset_id)
table_id = "{YourTableId}"
table_ref = dataset_ref.table(table_id)
#Supprimer la table
client.delete_table(table_ref)
C'est la fin de l'histoire autour de la création de table
Recommended Posts