[PYTHON] Utilisez Cloud Composer pour accéder régulièrement à l'API Youtube afin de créer un pipeline afin de stocker les résultats dans Bigquery

introduction

Bonjour. Je suis data scientist dans une entreprise d'IA. Récemment, je suis accro au Tokai On Air de Youtuber, mais je me suis soudainement demandé à quelle vitesse ils augmentaient le nombre total de vues et le nombre d'inscrits, alors je frappe régulièrement l'API Youtube avec Cloud Composer J'ai écrit un code pour stocker la transition quotidienne des valeurs numériques dans Bigquery.

En passant, vous pouvez voir la transition du nombre de vues pour le mois dernier à partir des sites suivants, mais je voulais collecter et analyser les données pendant une période plus longue, donc je le fais cette fois. [Facile] 3 façons de vérifier la transition du nombre d'abonnés à la chaîne YouTube

Paramètres initiaux de l'API Youtube

Effectuez les réglages initiaux pour pouvoir accéder à l'API Youtube. Fondamentalement, si vous recherchez l'API de données YouTube dans l'élément Services de GCP et que vous l'activez, vous pourrez la gérer. Après cela, enregistrez la clé API Youtube. Vous pouvez en savoir plus sur ce domaine en consultant le Site officiel.

Paramètres d'environnement de service GCP

Il n'y a aucun problème si vous définissez divers paramètres en vous référant au site suivant. Cependant, j'utilise l'interface utilisateur pour configurer Cloud Composer et Bigquery car il est plus facile à configurer à l'aide de l'interface utilisateur. Procédure de création d'un flux de travail pour manipuler les tables BigQuery dans GCP Cloud Composer

Cloud Composer Définissez uniquement les pièces suivantes et appuyez sur "Créer" sans rien définir d'autre. Créer un compositeur prend un temps considérable. composer.png

Installation de la bibliothèque

Après avoir créé le composeur, procédez aux tâches suivantes. Ici, si youtube-API n'est pas installé sur le composeur, vous ne pourrez pas accéder à l'API Youtube du compositeur plus tard, alors installez l'API Youtube sur le compositeur à l'avance. Commencez par créer le fichier de contenu suivant.

requirement.txt


youtube-data-api

Importez ce fichier dans Cloud Shell. Vous pouvez télécharger en vous référant au Site officiel. Exécutez ensuite le code suivant sur Cloud Shell.

gcloud config set core/projet Nom du projet
les environnements gcloud composer mettent à jour le nom du compositeur(toukai-onair-composer) --update-pypi-packages-from-file requirements.txt --emplacement Nom de l'emplacement(asia-northeast1)

Bigquery Créez la table comme décrit sur le site suivant. Créer et utiliser des tableaux

Créer une définition de DAG pour Composer

Maintenant que les préférences sont à peu près terminées, appuyez sur l'API Youtube et écrivez le code à stocker dans Bigquery.

Code qui atteint l'API Youtube

Vous trouverez ci-dessous le code qui renvoie le nombre total de vues et d'abonnés pour un canal particulier, étant donné channel_id et api_key.

youtubelib.py


from apiclient.discovery import build

class Youtubeapi:
    def __init__(self, channel_id, api_key):

        self.channel_id = channel_id
        if not channel_id:
            raise Exception("need channel_id")

        self.api_key = api_key
        if not api_key:
            raise Exception("need api_key")

    def get_statistics_data(self):
        youtube = build('youtube', 'v3', developerKey=self.api_key)
        search_response = youtube.channels().list(
            part='statistics',
            id=self.channel_id,
        ).execute()

        return search_response['items'][0]['statistics']

À propos, channel_id est répertorié dans le lien URL lorsque vous passez par votre Youtuber préféré sur Youtube et que vous y entrez. S'il s'agit de Tokai On Air, le lien est le suivant, donc le channel_id sera ʻUCutJqz56653xV2wwSvut_hQ`.

https://www.youtube.com/channel/UCutJqz56653xV2wwSvut_hQ

Vous pouvez obtenir l'api_key même lorsque vous activez l'API Youtube, mais vous pouvez également le vérifier à partir des informations d'identification suivantes. スクリーンショット 2020-06-01 8.27.54.png

Définition de DAG à stocker dans Bigquery

Vous trouverez ci-dessous le code DAG qui utilise la classe Youtubeapi pour accéder à l'API Youtube et stocke les données dans Bigquery.

dag.py


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from google.cloud import bigquery

from lib import youtubelib

start = DummyOperator(task_id='start')

GCP_PROJECT =Nom du projet
BQ_YOUTUBE_DATASET =Nom du jeu de données.nom de la table

YOUTUBE_CHANNEL_ID =Identifiant Tokai On Air Channel('UCutJqz56653xV2wwSvut_hQ')
YOUTUBE_API_KEY =Clé API Youtube

# see https://cloud-textbook.com/69/#_start_dateschedule_intervalcatchup
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 5, 18),
    'retries': 0,
    'max_active_runs': 1,
}

schedule_interval = '0 17 * * *'

# define dag
dag = DAG('toukai_trends', default_args=default_args,
          schedule_interval=schedule_interval, catchup=False)

youtubeapi = youtubelib.Youtubeapi(channel_id=YOUTUBE_CHANNEL_ID, api_key=YOUTUBE_API_KEY)


def pull_youtube_statistics_api(ds, **kwargs):
    statistics = youtubeapi.get_statistics_data()
    dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('dt', dt)
    viewcount = statistics['viewCount']
    print('viewCount', viewcount)
    subscribercount = statistics['subscriberCount']
    print('subscriberCount', subscribercount)
    videocount = statistics['videoCount']
    print('videoCount', videocount)

    bigquery_client = bigquery.Client()
    query = '''
    INSERT
        `{0}.{1}`
    SELECT
        CAST("{2}" AS timestamp) AS datetime,
        CAST("{3}" AS INT64) AS viewcount,
        CAST("{4}" AS INT64) as subscribercount,
        CAST("{5}" AS INT64) as videocount
    '''.format(GCP_PROJECT, BQ_YOUTUBE_DATASET, str(dt), str(viewcount), str(subscribercount), str(videocount))

    rows = bigquery_client.query(query).result()

    return 'ok'


job_transactiondetail_puller = PythonOperator(
    task_id='pull_youtube_statistics_api',
    provide_context=True,
    python_callable=pull_youtube_statistics_api,
    dag=dag,
)

Il y a divers points compliqués concernant l'exécution régulière de composer (définir la partie dag), donc l'exécution régulière de Comparaison approfondie du service cloud / explication approfondie (version 2020) Veuillez vérifier l'édition confuse de l'histoire (start_date, schedule_interval, catchup, etc.).

Importer la définition de DAG dans Cloud Composer

Importez dag.py dans Cloud Shell et procédez comme suit sur Shell:

gcloud composer environments storage dags import --environment=nom du compositeur
(toukai-onair-composer) --location=asia-northeast1 --source=./dag.py

Lorsque vous l'exécutez, un fichier est créé sur GCS et youtubelib.py y est également placé.

Contrôle de fonctionnement

Identique à la vérification de l'état d'exécution de Cloud Composer DAG dans Procédure de création d'un flux de travail pour manipuler les tables BigQuery dans GCP Cloud Composer Si vous le vérifiez, il n'y a pas de problème.

Vous pouvez également consulter le journal des résultats d'exécution à partir de afficher les journaux dans Airflow.

Un sentiment de prix

Je l'ai réalisé quand je l'ai essayé, mais c'est toujours cher. C'était un niveau pénible pour mon argent de poche, alors j'ai arrêté de le faire fonctionner pendant longtemps ... (Dans ce cas, la QOL enregistrée dans Youtube Premium est élevée ...) Pour une idée du prix, veuillez consulter la «Price Edition» de Comparaison approfondie et explication approfondie des services cloud (version 2020).

Résumé

J'ai essayé de le fabriquer, mais cela m'a paru difficile de l'utiliser personnellement car c'était trop cher ...

La prochaine fois, je me demande si cela peut être moins cher en exécutant régulièrement Cloud Finction, alors je pense à le créer.

Ayons une bonne vie sur Youtube! !!

Recommended Posts

Utilisez Cloud Composer pour accéder régulièrement à l'API Youtube afin de créer un pipeline afin de stocker les résultats dans Bigquery
[sh] Comment stocker les résultats de l'exécution de la commande dans des variables
Essayez d'utiliser l'API Wunderlist en Python
Essayez d'utiliser l'API Kraken avec Python
Essayez d'accéder à l'API Spotify dans Django.
Convertir l'API cURL en script Python (à l'aide du stockage d'objets IBM Cloud)
Essayez d'utiliser l'API BitFlyer Ligntning en Python
Essayez d'utiliser l'API DropBox Core avec Python
Essayez de faire une stratégie de blackjack en renforçant l'apprentissage (② Enregistrer l'environnement dans le gymnase)
J'ai essayé de créer un pipeline ML avec Cloud Composer
Essayez de supprimer des tweets en masse à l'aide de l'API de Twitter
J'ai créé une classe pour obtenir le résultat de l'analyse par MeCab dans ndarray avec python
Comment générer une requête à l'aide de l'opérateur IN dans Django
Essayez de juger des photos de plats à l'aide de l'API Google Cloud Vision
Essayez de modéliser une distribution multimodale à l'aide de l'algorithme EM
Essayez d'utiliser l'API Twitter
Essayez d'utiliser l'API Twitter
Essayez d'utiliser l'API PeeringDB 2.0
Essayez de modifier une nouvelle image à l'aide du modèle StyleGAN2 entraîné
Le moyen le plus simple de créer un environnement d'utilisation Spleeter à l'aide de Windows
Vérifiez le résultat du dessin à l'aide de Plotly en incorporant CodePen dans Qiita
[AWS / Tello] Construction d'un système d'exploitation de drones sur le cloud
Jouez en accédant à l'API Riot Games en Python Première moitié
Essayez d'utiliser FireBase Cloud Firestore avec Python pour le moment
Si vous voulez être guéri par l'image de Mia Nanasawa, appuyez sur l'API Twitter ♪
[CleanArchitecture avec Python] Appliquez CleanArchitecture à une API simple étape par étape, et essayez de comprendre "quel type de changement est fort" dans la base de code.