[PYTHON] Try to build a pipeline to store the result in Bigquery by hitting the Youtube API regularly using Cloud Composer

Introduction

Hello. I'm a data scientist at an AI venture. Recently, I'm addicted to Youtuber's Tokai OnAir, but suddenly I was wondering how fast they are increasing the total number of views and the number of registrants, so I hit the Youtube API regularly with Cloud Composer and hit it regularly. I wrote a code to store daily numerical transitions in Bigquery.

By the way, you can see the transition of the number of views for the past month from the following site, but I wanted to collect and analyze the data for a longer period of time, so I am doing this this time. [Easy] 3 ways to check the transition of the number of YouTube channel subscribers

Youtube API initial settings

Make initial settings so that you can hit the Youtube API. Basically, you can handle it by searching for the YouTube Data API in the Services item of GCP and enabling it. After that, save the Youtube API key. You can find out about this area by looking at the Official Site.

GCP service preferences

There is no problem if you set various settings referring to the following site. However, I use the UI to configure Cloud Composer and Bigquery because it's easier to configure using the UI. Procedure for creating a workflow for manipulating BigQuery tables with GCP Cloud Composer

Cloud Composer Set only the following parts, and press "Create" without setting anything else. Creating composer takes a lot of time. composer.png

Library installation

Once you have created composer, proceed to the following: Here, if youtube-API is not installed in composer, youtube API will not be able to be hit from composer later, so install Youtube API in composer in advance. First, create a file with the following contents.

requirement.txt


youtube-data-api

Upload this file to Cloud Shell. You can upload by referring to the Official Site. Then run the following code on the Cloud Shell.

gcloud config set core/project project name
gcloud composer environments update composer name(toukai-onair-composer) --update-pypi-packages-from-file requirements.txt --location Location name(asia-northeast1)

Bigquery Create the table as described on the following site. Create and use tables (https://cloud.google.com/bigquery/docs/tables?hl=ja)

Create Composer DAG definition

Now that the preferences are pretty much done, hit the Youtube API and write the code to store in Bigquery.

Code that hits the Youtube API

Below is the code that returns the total number of views and subscribers for a particular channel given the channel_id and api_key.

youtubelib.py


from apiclient.discovery import build

class Youtubeapi:
    def __init__(self, channel_id, api_key):

        self.channel_id = channel_id
        if not channel_id:
            raise Exception("need channel_id")

        self.api_key = api_key
        if not api_key:
            raise Exception("need api_key")

    def get_statistics_data(self):
        youtube = build('youtube', 'v3', developerKey=self.api_key)
        search_response = youtube.channels().list(
            part='statistics',
            id=self.channel_id,
        ).execute()

        return search_response['items'][0]['statistics']

By the way, channel_id is described in the URL link when you go through your favorite Youtuber on Youtube and enter there. If it is Tokai OnAir, the link is as follows, so the channel_id will be ʻUCutJqz56653xV2wwSvut_hQ`.

https://www.youtube.com/channel/UCutJqz56653xV2wwSvut_hQ

You can get the api_key even when you enable the Youtube API, but you can also check it from the following credentials. スクリーンショット 2020-06-01 8.27.54.png

DAG definition to store in Bigquery

Below is the DAG code that uses the Youtubeapi class to hit the Youtube API and store the data in Bigquery.

dag.py


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from google.cloud import bigquery

from lib import youtubelib

start = DummyOperator(task_id='start')

GCP_PROJECT =Project name
BQ_YOUTUBE_DATASET =Data set name.table name

YOUTUBE_CHANNEL_ID =Tokai OnAir Channel ID('UCutJqz56653xV2wwSvut_hQ')
YOUTUBE_API_KEY =Youtube API Key

# see https://cloud-textbook.com/69/#_start_dateschedule_intervalcatchup
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 5, 18),
    'retries': 0,
    'max_active_runs': 1,
}

schedule_interval = '0 17 * * *'

# define dag
dag = DAG('toukai_trends', default_args=default_args,
          schedule_interval=schedule_interval, catchup=False)

youtubeapi = youtubelib.Youtubeapi(channel_id=YOUTUBE_CHANNEL_ID, api_key=YOUTUBE_API_KEY)


def pull_youtube_statistics_api(ds, **kwargs):
    statistics = youtubeapi.get_statistics_data()
    dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('dt', dt)
    viewcount = statistics['viewCount']
    print('viewCount', viewcount)
    subscribercount = statistics['subscriberCount']
    print('subscriberCount', subscribercount)
    videocount = statistics['videoCount']
    print('videoCount', videocount)

    bigquery_client = bigquery.Client()
    query = '''
    INSERT
        `{0}.{1}`
    SELECT
        CAST("{2}" AS timestamp) AS datetime,
        CAST("{3}" AS INT64) AS viewcount,
        CAST("{4}" AS INT64) as subscribercount,
        CAST("{5}" AS INT64) as videocount
    '''.format(GCP_PROJECT, BQ_YOUTUBE_DATASET, str(dt), str(viewcount), str(subscribercount), str(videocount))

    rows = bigquery_client.query(query).result()

    return 'ok'


job_transactiondetail_puller = PythonOperator(
    task_id='pull_youtube_statistics_api',
    provide_context=True,
    python_callable=pull_youtube_statistics_api,
    dag=dag,
)

There are various complicated points regarding the regular execution of composer (define dag part), so the `regular execution of Cloud service thorough comparison / thorough explanation (2020 version) Check out the confusing story about (start_date, schedule_interval, catchup, etc.).

Upload DAG definition to Cloud Composer

Upload dag.py to Cloud Shell and do the following on Shell:

gcloud composer environments storage dags import --environment=composer name
(toukai-onair-composer) --location=asia-northeast1 --source=./dag.py

When you execute it, a file is created on GCS, and youtubelib.py is also placed there.

Operation check

Same as Cloud Composer DAG execution status check in Procedure to create workflow to operate BigQuery table with GCP Cloud Composer If you check it, there is no problem.

You can also check the execution result log from view logs in Airflow.

A feeling of price

I realized it when I tried it, but it's still expensive. It was a painful level for my pocket money, so I stopped running it for a long time ... (In this case, the QOL registered for Youtube Premium is high ...) For a sense of price, see the Price Edition of Cloud Service Thorough Comparison / Explanation (2020 Edition).

Summary

I tried to make it, but it seemed difficult to use it personally because it was too expensive ...

Next time, I'm wondering if it will be cheaper if I try to run Cloud Finction regularly, so I'm thinking of creating it.

Let's have a good Youtube life! !!

Recommended Posts

Try to build a pipeline to store the result in Bigquery by hitting the Youtube API regularly using Cloud Composer
Try hitting the YouTube API in Python
[sh] How to store the command execution result in a variable
Regularly upload files to Google Drive using the Google Drive API in Python
Try using the Wunderlist API in Python
Try using the Kraken API in Python
Try hitting the Spotify API in Django.
Convert the cURL API to a Python script (using IBM Cloud object storage)
Try using the BitFlyer Ligntning API in Python
Try using the DropBox Core API in Python
Try to make a blackjack strategy by reinforcement learning (② Register the environment in gym)
I tried to build ML Pipeline with Cloud Composer
Try to delete tweets in bulk using Twitter API
I made a class to get the analysis result by MeCab in ndarray with python
How to generate a query using the IN operator in Django
Try to determine food photos using Google Cloud Vision API
Try to model a multimodal distribution using the EM algorithm
Try using the Twitter API
Try using the Twitter API
Try using the PeeringDB 2.0 API
Try to edit a new image using the trained StyleGAN2 model
The simplest way to build a Spleeter usage environment using Windows
Check the drawing result using Plotly by embedding CodePen in Qiita
[AWS / Tello] Build a system to operate the drone on the cloud
Play by hitting the Riot Games API in Python First half
Try using FireBase Cloud Firestore in Python for the time being
I want to be healed by Mia Nanasawa's image. In such a case, hit the Twitter API ♪
[CleanArchitecture with Python] Apply CleanArchitecture step by step to a simple API and try to understand "what kind of change is strong" in the code base.