[PYTHON] 4 façons de mettre à jour les différences entre les feuilles de calcul et BigQuery

Ce que je voulais faire, c'était importer les informations de la feuille de calcul dans BigQuery et mettre à jour les données de différence.

J'étais en difficulté

À l'origine, le processus était exécuté selon le flux suivant.

図7.png

Cependant, dans ce cas, les données n'étaient souvent pas mises à jour même si les données étaient insérées immédiatement après la création d'une nouvelle table. Lors de l'exécution de SQL, bien que l'on puisse voir que l'exécution est terminée avec le statut de job id, Parfois, le décalage horaire était assez grand et je pouvais l'insérer avec le sommeil, Je cherchais une méthode stable car je ne pouvais pas le faire.

Conclusion

Il a été résolu en changeant le flux ci-dessus en flux suivant.

図11.png

Pour un traitement qui crée une table et télécharge les données séparément, mais en même temps Quand je l'ai changé, ça a bien fonctionné. C'est naturel de demander, mais je ne savais pas comment l'écrire, alors comment après avoir créé le tableau Je regardais juste les méthodes de "comment saisir l'achèvement de l'exécution?" Et "comment effectuer le traitement en attente?".

Ce qui est écrit dans cet article

Cette fois, seules les données de différence des informations mises à jour dans la feuille de calcul J'écris le contenu pour réfléchir.

Les données de la feuille de calcul sont disponibles ci-dessous.

図8.png

Nom japonais sur la première ligne, colonne pour BigQuery sur la deuxième ligne, type de données sur la troisième ligne, C'est une structure de données que les données réelles sont contenues dans la 4ème ligne et les suivantes.

Je voulais vraiment inclure les paramètres PK, mais Cette fois, il n'est pas inclus comme objectif, donc si l'ID dans la colonne A correspond, il sera mis à jour S'ils ne correspondent pas, ils seront mis à jour.

La table BigQuery sera mise à jour comme suit.

図6.png

4 façons de le faire

J'ai pratiqué les quatre choses suivantes.

① Créer une table en exécutant SQL ⇒ Mettre à jour ② Créez une table avec google-cloud-bigquery ⇒ Mise à jour à partir de pandas ③ Créer / mettre à jour une table avec pandas-gbq ④ Créer / mettre à jour une table avec une requête google-cloud-big

① et ② sont des modèles qui ne peuvent pas être téléchargés après la création d'une table dans le flux initialement prévu. ③ et ④ sont des modèles qui ont réussi en téléchargeant des données en même temps que la création de la table.

Je pense que certaines personnes ont les mêmes problèmes, j'ai donc énuméré ① et ② également.

Vous trouverez ci-dessous l'introduction du code.

Code de pièce commun

Authentification


from google.colab import auth
auth.authenticate_user()

Lorsque vous exécutez ce qui précède, le code d'authentification suivant s'affiche. 図4.png

Un autre article montre également comment le faire sans code d'autorisation, Si vous souhaitez fusionner avec des données de tables de différents PJ Je pense que vous devriez entrer le code de vérification à chaque fois.

Référence: Comment exécuter avec Google Colaboratory sans code d'autorisation

L'opération de feuille de calcul et l'opération BigQuery sont écrites dans un article séparé, les détails sont donc omis. Comment utiliser les feuilles de calcul en Python (comment utiliser gspread) Comment utiliser BigQuery avec Python

Se préparer à utiliser la feuille de calcul


!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread

#Processus d'authentification
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())

ss_id = `ID de la feuille de calcul`
sht_name = 'Nom de la feuille'

workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()

values
# [['ID', 'Nom', 'âge', 'Choses favorites'],
#  ['id', 'name', 'age', 'favorit'],
#  ['int64', 'string', 'int64', 'string'],
#  ['1', 'Bob', '25', 'apple'],
#  ['2', 'Tom', '32', 'orange'],
#  ['3', 'Jay', '28', 'grape']]

Prêt à utiliser BigQuery


project_id = 'ID du projet GCP'
dateset = 'Nom du jeu de données'
table_name = 'nom de la table'

from google.cloud import bigquery
client = bigquery.Client(project=project_id)

Jusque-là, ce sera la partie partagée entre ① et ④.

(1) Créer / mettre à jour la table en exécutant SQL

Je ne sais pas si l'exécution de l'expression SQL est correcte, C'est une méthode qui peut être exécutée avec le même SQL que celui utilisé avec l'éditeur de requêtes.

Je pense que le mérite est que vous pouvez voir le statut par «facile à écrire» et «ID de poste».

Création SQL pour la création de table temporaire


table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'

for column, dtype in zip(values[1],values[2]):
  query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query

# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)

Exécution SQL


#Courir
result = client.query(query)

#Vérifier l'état du travail
client.get_job(result.job_id, location='asia-northeast1').state

# 'DONE'

La méthode de création du tableau jusqu'à présent est différente entre ① et ③, Le SQL pour mettre à jour les différences suivantes est couramment utilisé.

Création SQL pour mise à jour


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query

# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)

Exécution SQL pour la mise à jour


client.query(query)

Après avoir tout exécuté, lorsque vous regardez le résultat de la requête sur BigQuery Il a réussi comme le montre l'image ci-dessous.

図9.png

Cependant, si vous regardez le tableau, il sera dans un état vide comme indiqué ci-dessous. Le statut du travail est "DONE", veuillez donc le refléter ...

図2.png

Les données sont reflétées après environ 5 minutes en raison d'un problème de serveur. C'était inutile car je voulais le faire fonctionner en continu immédiatement.

② Créer / mettre à jour une table avec une requête google-cloud-big

Il s'agit d'un cas où les informations de la feuille de calcul sont téléchargées avec le contenu répertorié ici. https://googleapis.dev/python/bigquery/latest/usage/tables.html

Supprimer la table et créer un schéma


table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)

#Supprimer s'il y a une table
client.delete_table(f'{table_id}_temp', not_found_ok=True)

schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema

# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('name', 'STRING', 'NULLABLE', None, ()),
#  SchemaField('age', 'INT64', 'NULLABLE', None, ()),
#  SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]

Créer une table temporaire


table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)

Ce qui suit est le même code que ①, il est donc résumé.

Créer et exécuter SQL pour la mise à jour


df = pd.DataFrame(values[3:], columns=values[1])
#Suppression des blancs, traitement des blancs
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

#Mettre à jour la table temporaire
client.insert_rows(table, df.values.tolist())

#Création de SQL pour la mise à jour
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1][1:]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

#Exécutez SQL pour mettre à jour les données
client.query(query)

Même avec cela, la probabilité qu'une table de person_temp soit créée était faible comme dans ①. J'utilisais beaucoup ce code lors de la liaison avec DP, À ce moment-là, je m'endors environ 20 secondes pour que ça marche.

③ Créer / mettre à jour une table avec pandas-gbq

Je ne m'attendais pas à grand-chose car certains autres blogs disaient que c'était mal. Je suis content de l'avoir essayé.

Créer un DataFrame


#Nécessite une installation pour utiliser
!pip install pandas-gbq

df = pd.DataFrame(values[3:], columns=values[1])
#Suppression des blancs, traitement des blancs
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})

La création de ce schéma crée un type de dictionnaire de «nom» et «type».

Créer un schéma


schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append({'name': column, 'type': dtype})
schema

# [{'name': 'id', 'type': 'int64'},
#  {'name': 'name', 'type': 'string'},
#  {'name': 'age', 'type': 'int64'},
#  {'name': 'favorit', 'type': 'string'}]

Mettre à jour vers la table temporaire


df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)

Créer et exécuter SQL pour la mise à jour


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

J'ai pu mettre à jour ici sans aucun problème. Cependant, si vous regardez le site ci-dessous, il est dit que vous devriez passer de pandas-gbq à google-cloud-bigquery. https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja

Donc, ensuite, j'ai décrit la méthode en utilisant google-cloud-bigquery.

④ Créer / mettre à jour une table avec une requête google-cloud-big

Installez pyarrow


#Installez pyarrow
!pip install pyarrow

table_id = f'{project_id}.{dateset}.{table_name}'

df = pd.DataFrame(values[3:], columns=values[1])
#Suppression des blancs, traitement des blancs
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])

Création d'un schéma et modification du type de données d'un DataFrame


schema = []
for column, dtype in zip(values[1],values[2]):
  schema.append(bigquery.SchemaField(column,dtype))
  if dtype != 'string':
    df[column] = df[column].astype(dtype)
schema

google-cloud-bigquery est différent de pandas-gbq Une erreur se produit si les types de données de la table et DataFrame ne correspondent pas. Le cadre officiel est plus fin ...

Vous pouvez vérifier si le type de données a changé avec df.dtpyes.

dtypes


df.dtypes

# id          int64
# name       object
# age         int64
# favorit    object
# dtype: object

Ensuite, effectuez les réglages à effectuer avec load_table_from_dataframe () pour être exécutés plus tard. «config» signifie «config» en japonais.

Vous trouverez ci-dessous une liste d'attributs pouvant être définis. https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html

Cette fois, schema et write_disposition seront définis pour écraser la table existante lorsqu'elle existe.

paramètres Jon


job_config = bigquery.LoadJobConfig(
  schema = schema,
  write_disposition = 'WRITE_TRUNCATE'
)

Télécharger les données df


job = client.load_table_from_dataframe(
  df, f'{table_id}_temp', job_config=job_config
)

# Wait for the load job to complete.
job.result()

Ce qui suit est le même code que ①, il est donc résumé.

Créer et exécuter SQL pour la mise à jour


query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''

insert_query = ''
for val in values[1]:
  query += f'{val} = S.{val},\n'
  insert_query += f'{val}, '
insert_query = insert_query[:-2]

query = query[:-2] + '\n'

query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'

client.query(query)

Je pense que ③ est meilleur car il est plus facile de spécifier le type de données. ④ va-t-il plus vite?

Une autre chose qui m'inquiète, c'est quand la quantité de données augmente. Étant donné que la limite d'insertion dans la requête google-cloud-big était de 10 000 enregistrements Si vous voulez faire plus que cela, vous devrez peut-être effectuer chaque traitement.

Dans ce cas, remplacez la table uniquement la première fois. A partir de la deuxième fois, il ne semble y avoir aucun problème si le processus est inséré.

Recommended Posts

4 façons de mettre à jour les différences entre les feuilles de calcul et BigQuery
Mettre à jour python-social-auth de 0.1.x à 0.2.x
Mettre à jour Mac Python de 2 à 3
Comment mettre à jour Google Sheets à partir de Python
Changements de Python 3.0 à Python 3.5
Comment mettre à jour easy_install
Transition de WSL1 à WSL2
Comment mettre à jour Spyder
Utilisez BigQuery depuis Python.
Exécutez BigQuery à partir de Lambda
mettre à jour django version 1.11.1 vers 2.2
De l'édition à l'exécution