[PYTHON] Identify the YouTube channel of Hikakin videos from thumbnail images using CNN

Overview

The top YouTuber in Japan is Mr. HIKAKIN (hereinafter referred to as "HIKAKIN"). I also love it and watch videos every day.

Hikakin is HIKAKIN, HikakinTV, [HikakinGames](https://www. We operate four channels: youtube.com/user/HikakinGames) and HikakinBlog. Here, I thought it would be interesting to be able to determine which channel a video belongs to based on only the information called thumbnail images, so I implemented it using machine learning.

どのチャンネルの動画?

We use TensorFlow as a machine learning framework. And from collecting images, in TensorFlow [CNN (Convolutional Neural Network)](https://ja.wikipedia.org/wiki/%E7%95%B3%E3%81%BF%E8%BE%BC%E3 % 81% BF% E3% 83% 8B% E3% 83% A5% E3% 83% BC% E3% 83% A9% E3% 83% AB% E3% 83% 8D% E3% 83% 83% E3% 83 I would like to introduce the flow of implementation from the point of implementing% 88% E3% 83% AF% E3% 83% BC% E3% 82% AF) to the point of actually inferring.

code

quanon/pykin

version information

Python

tool version Use / purpose
Python 3.6.1
Selenium 3.4.0 Scraping
TensorFlow 1.1.0 Machine learning
NumPy 1.12.1 Numerical calculation

Other tools

tool version Use
ChromeDriver 2.29 To run Chrome on Selenium
iTerm2 3.0.15 To display the image on the terminal

procedure

flow

  1. Get the URL of the thumbnail image
  2. Download the thumbnail image
  3. Divide the image into training data and test data
  4. Output CSV that associates data with label
  5. Implement a class that represents the CNN model
  6. Implement a function to read an image from CSV
  7. Learn the CNN model
  8. Test the trained model
  9. Infer with a trained model

1. Get the URL of the thumbnail image

code

fetch_urls.py


import os
import sys
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

def fetch_urls(channel):
  driver = webdriver.Chrome()
  url = os.path.join('https://www.youtube.com/user', channel, 'videos')
  driver.get(url)

  while True:
    driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')

    try:
      #Wait for the "Load more" button to be clickable.
      more = WebDriverWait(driver, 3).until(
        EC.element_to_be_clickable((By.CLASS_NAME, 'load-more-button'))
      )
    except StaleElementReferenceException:
      continue;
    except TimeoutException:
      break;

    more.click()

  selector = '.yt-thumb-default .yt-thumb-clip img'
  elements = driver.find_elements_by_css_selector(selector)
  src_list = [element.get_attribute('src') for element in elements]
  driver.quit()

  with open(f'urls/{channel}.txt', 'wt') as f:
    for src in src_list:
      print(src, file=f)

if __name__ == '__main__':
  fetch_urls(sys.argv[1])

Description

Use Selenium to interact with Google Chrome and collect thumbnail image URLs.

Specifically, first scroll down the screen until the "Load more" button at the bottom of the screen disappears. Doing so will display all thumbnail images on your browser screen. After that, get the value of the src attribute of all the img elements corresponding to the thumbnail image and write it to the text file under the ʻurls` directory.

Execution result

$ python fetch_urls.py HikakinTV

$ wc -l urls/HikakinTV.txt
    2178 urls/HikakinTV.txt
$ head -n 3 urls/HikakinTV.txt
https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=tRWLF3Pa-fZrEa5XTmPeHyVORv4
https://i.ytimg.com/vi/bolTkMSMrSA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=a0_PeYpyB9RrOhb3ySd4i7nJ9P8
https://i.ytimg.com/vi/jm4cK_XPqMA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=VymexTRKLE_wQaYtSKqrph1okcA

2. Download the thumbnail image

code

download.rb


import os
import random
import re
import sys
import time
from urllib.request import urlretrieve


def download(channel):
  with open(f'urls/{channel}.txt', 'rt') as f:
    lines = f.readlines()

  dir = os.path.join('images', channel)
  if not os.path.exists(dir):
    os.makedirs(dir)

  for url in lines:
    # https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg
    #Use the part of ieHNKaG1KfA in the URL as the image name.
    name = re.findall(r'(?<=vi/).*(?=/hqdefault)', url)[0]
    path = os.path.join(dir, f'{name}.jpg')

    if os.path.exists(path):
      print(f'{path} already exists')
      continue

    print(f'download {path}')
    urlretrieve(url, path)
    time.sleep(1 + random.randint(0, 2))

if __name__ == '__main__':
  download(sys.argv[1])

Description

After reading the text file output by fetch_urls.py, use urlretrieve (). Download the thumbnail image.

By the way, all the downloaded thumbnail images are unified in size to 196 x 110. It should be easy to handle: blush:

Execution result

$ python download.py HikakinTV
download images/HikakinTV/1ngTnVb9oF0.jpg
download images/HikakinTV/AGonzpJtyYU.jpg
images/HikakinTV/MvwxFi3ypNg.jpg already exists
(Abbreviation)

$ ls -1 images/HikakinTV | wc -l
    2178
$ ls -1 images/HikakinTV
-2DRamjx75o.jpg
-5Xk6i1jVhs.jpg
-9U3NOHsT1k.jpg
(Abbreviation)

3. Divide the image into training data and test data

code

split_images.py


import glob
import numpy as np
import os
import shutil


def clean_data():
  for dirpath, _, filenames in os.walk('data'):
    for filename in filenames:
      os.remove(os.path.join(dirpath, filename))


def split_pathnames(dirpath):
  pathnames = glob.glob(f'{dirpath}/*')
  np.random.shuffle(pathnames)

  #reference:Dataset with NumPy(ndarray)To any percentage
  # http://qiita.com/QUANON/items/e28335fa0e9f553d6ab1
  return np.split(pathnames, [int(0.7 * len(pathnames))])


def copy_images(data_dirname, class_dirname, image_pathnames):
  class_dirpath = os.path.join('data', data_dirname, class_dirname)

  if not os.path.exists(class_dirpath):
    os.makedirs(class_dirpath)

  for image_pathname in image_pathnames:
    image_filename = os.path.basename(image_pathname)
    shutil.copyfile(image_pathname,
      os.path.join(class_dirpath, image_filename))


def split_images():
  for class_dirname in os.listdir('images'):
    image_dirpath = os.path.join('images', class_dirname)

    if not os.path.isdir(image_dirpath):
      continue

    train_pathnames, test_pathnames = split_pathnames(image_dirpath)

    copy_images('train', class_dirname, train_pathnames)
    copy_images('test', class_dirname, test_pathnames)

if __name__ == '__main__':
  clean_data()
  split_images()

Description

ʻImages / channel name The image file downloaded to the directory is randomly divided into training data and test data. Specifically, the image files in the ʻimages / channel name directory should be placed in the data / train / channel name directory or the data / test / channel name directory so that the ratio of training data to test data is 7: 3. Copy to.

images/
 ├ HIKAKIN/
 ├ HikakinBlog/
 ├ HikakinGames/
 └ HikakinTV/

 ↓ train : test = 7 :Copy to be 3

data/
 ├ train/
 │ ├ HIKAKIN/
 │ ├ HikakinBlog/
 │ ├ HikakinGames/
 │ └ HikakinTV/
 │
 └ test/
   ├ HIKAKIN/
   ├ HikakinBlog/
   ├ HikakinGames/
   └ HikakinTV/

Execution result

$ python split_images.py

$ find images -name '*.jpg' | wc -l
    3652
$ find data/train -name '*.jpg' | wc -l
    2555
$ find data/test -name '*.jpg' | wc -l
    1097

4. Output CSV that associates data with label

code

config.py


from enum import Enum


class Channel(Enum):
  HIKAKIN = 0
  HikakinBlog = 1
  HikakinGames = 2
  HikakinTV = 3

LOG_DIR = 'log'

write_csv_file.py


import os
import csv
from config import Channel, LOG_DIR


def write_csv_file(dir):
  with open(os.path.join(dir, 'data.csv'), 'wt') as f:
    for i, channel in enumerate(Channel):
      image_dir = os.path.join(dir, channel.name)
      writer = csv.writer(f, lineterminator='\n')

      for filename in os.listdir(image_dir):
        writer.writerow([os.path.join(image_dir, filename), i])

if __name__ == '__main__':
  write_csv_file('data/train')
  write_csv_file('data/test')

Description

Later, you'll use it to load images and labels using TensorFlow for learning and testing.

Output result

$ python write_csv_file.py

$ cat data/train/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abbreviation)
data/train/HikakinBlog/-OtqlF5BMNY.jpg,1
data/train/HikakinBlog/07XKtHfni1A.jpg,1
(Abbreviation)
data/train/HikakinGames/-2VyYsCkPZI.jpg,2
data/train/HikakinGames/-56bZU-iqQ4.jpg,2
(Abbreviation)
data/train/HikakinTV/-5Xk6i1jVhs.jpg,3
data/train/HikakinTV/-9U3NOHsT1k.jpg,3
(Abbreviation)
$ cat data/test/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abbreviation)
data/test/HikakinBlog/2Z6GB9JjV4I.jpg,1
data/test/HikakinBlog/4eGZtFhZWIE.jpg,1
(Abbreviation)
data/test/HikakinGames/-FpYaEmiq1M.jpg,2
data/test/HikakinGames/-HFXWY1-M8M.jpg,2
(Abbreviation)
data/test/HikakinTV/-2DRamjx75o.jpg,3
data/test/HikakinTV/-9zt1EfKJYI.jpg,3
(Abbreviation)

5. Implement a class that represents the CNN model

code

cnn.py


import tensorflow as tf


class CNN:
  def __init__(self, image_size=48, class_count=2, color_channel_count=3):
    self.image_size = image_size
    self.class_count = class_count
    self.color_channel_count = color_channel_count

  #Function for inference.
  def inference(self, x, keep_prob, softmax=False):
    #Tf for storing weights.Create a Variable.
    def weight_variable(shape):
      initial = tf.truncated_normal(shape, stddev=0.1)

      return tf.Variable(initial)

    #Tf to store bias.Create a Variable.
    def bias_variable(shape):
      initial = tf.constant(0.1, shape=shape)

      return tf.Variable(initial)

    #Perform convolution.
    def conv2d(x, W):
      return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

    # [2x2]Pooling is performed with the size of and the amount of movement 2.
    def max_pool_2x2(x):
      return tf.nn.max_pool(x,
        ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1],
        padding='SAME')

    x_image = tf.reshape(
      x,
      [-1, self.image_size, self.image_size, self.color_channel_count])

    with tf.name_scope('conv1'):
      W_conv1 = weight_variable([5, 5, self.color_channel_count, 32])
      b_conv1 = bias_variable([32])
      h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)

    with tf.name_scope('pool1'):
      h_pool1 = max_pool_2x2(h_conv1)

    with tf.name_scope('conv2'):
      W_conv2 = weight_variable([5, 5, 32, 64])
      b_conv2 = bias_variable([64])
      h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)

    with tf.name_scope('pool2'):
      h_pool2 = max_pool_2x2(h_conv2)

    with tf.name_scope('fc1'):
      W_fc1 = weight_variable(
        [int(self.image_size / 4) * int(self.image_size / 4) * 64, 1024])
      b_fc1 = bias_variable([1024])
      h_pool2_flat = tf.reshape(
        h_pool2,
        [-1, int(self.image_size / 4) * int(self.image_size / 4) * 64])
      h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
      h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

    with tf.name_scope('fc2'):
      W_fc2 = weight_variable([1024, self.class_count])
      b_fc2 = bias_variable([self.class_count])
      y = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

    if softmax:
      with tf.name_scope('softmax'):
        y = tf.nn.softmax(y)

    return y

  #A loss function for calculating the error between the inference result and the correct answer.
  def loss(self, y, labels):
    #Calculate the cross entropy.
    # tf.nn.softmax_cross_entropy_with_The logits argument logits
    #Do not give a variable to which the softmax function is applied.
    cross_entropy = tf.reduce_mean(
      tf.nn.softmax_cross_entropy_with_logits(logits=y, labels=labels))
    tf.summary.scalar('cross_entropy', cross_entropy)

    return cross_entropy

  #Functions for learning
  def training(self, cross_entropy, learning_rate=1e-4):
    train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)

    return train_step

  #Correct answer rate(accuracy)To ask.
  def accuracy(self, y, labels):
    correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(labels, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    tf.summary.scalar('accuracy', accuracy)

    return accuracy

Description

An implementation of the CNN model. It's the heart of this project, but it's mostly about the TensorFlow tutorial Deep MNIST for Experts [Code](https://github. Same as com / tensorflow / tensorflow / blob / master / tensorflow / examples / tutorials / mnist / mnist_deep.py). However, it is classified to increase versatility.

6. Implement a function to read an image from CSV

code

load_data.py


import tensorflow as tf


def load_data(csvpath, batch_size, image_size, class_count,
  shuffle=False, min_after_dequeue=1000):

  queue = tf.train.string_input_producer([csvpath], shuffle=shuffle)
  reader = tf.TextLineReader()
  key, value = reader.read(queue)
  imagepath, label = tf.decode_csv(value, [['imagepath'], [0]])

  jpeg = tf.read_file(imagepath)
  image = tf.image.decode_jpeg(jpeg, channels=3)
  image = tf.image.resize_images(image, [image_size, image_size])
  #Scale to 0 on average.
  image = tf.image.per_image_standardization(image)

  #Label value one-Convert to hot expression.
  label = tf.one_hot(label, depth=class_count, dtype=tf.float32)

  capacity = min_after_dequeue + batch_size * 3

  if shuffle:
    images, labels = tf.train.shuffle_batch(
      [image, label],
      batch_size=batch_size,
      num_threads=4,
      capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  else:
    images, labels = tf.train.batch(
      [image, label],
      batch_size=batch_size,
      capacity=capacity)

  return images, labels

Description

A function for reading images and labels from CSV. It will be used later in learning and testing. Use tf.train.shuffle_batch () to shuffle test data during training, without shuffling during testing I'm assuming you're using tf.train.batch ().

7. Learn the CNN model

code

train.py


import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data

#Suppress TensorFlow warning messages.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('step_count', 1000, 'Number of steps.')
flags.DEFINE_integer('batch_size', 50, 'Batch size.')
flags.DEFINE_float('learning_rate', 1e-4, 'Initial learning rate.')


def main():
  with tf.Graph().as_default():
    cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
    images, labels = load_data(
      'data/train/data.csv',
      batch_size=FLAGS.batch_size,
      image_size=FLAGS.image_size,
      class_count=len(Channel),
      shuffle=True)
    keep_prob = tf.placeholder(tf.float32)

    logits = cnn.inference(images, keep_prob)
    loss = cnn.loss(logits, labels)
    train_op = cnn.training(loss, FLAGS.learning_rate)
    accuracy = cnn.accuracy(logits, labels)

    saver = tf.train.Saver()
    init_op = tf.global_variables_initializer()

    with tf.Session() as sess:
      sess.run(init_op)
      coord = tf.train.Coordinator()
      threads = tf.train.start_queue_runners(sess=sess, coord=coord)

      summary_op = tf.summary.merge_all()
      summary_writer = tf.summary.FileWriter(LOG_DIR, sess.graph)

      for step in range(1, FLAGS.step_count + 1):
        _, loss_value, accuracy_value = sess.run(
          [train_op, loss, accuracy], feed_dict={keep_prob: 0.5})

        if step % 10 == 0:
          print(f'step {step}: training accuracy {accuracy_value}')
          summary = sess.run(summary_op, feed_dict={keep_prob: 1.0})
          summary_writer.add_summary(summary, step)

      coord.request_stop()
      coord.join(threads)

      save_path = saver.save(sess, os.path.join(LOG_DIR, 'model.ckpt'))

if __name__ == '__main__':
  main()

Description

Actually read the image and learn CNN. This time, 1,000 steps of learning are performed, and the correct answer rate (accuray) is output every 10 steps. Save the learned parameters in log / model.ckpt.

Execution result

$ python train.py
step 10: training accuracy 0.5600000023841858
step 20: training accuracy 0.47999998927116394
step 30: training accuracy 0.7200000286102295
(Abbreviation)
step 980: training accuracy 1.0
step 990: training accuracy 0.9800000190734863
step 1000: training accuracy 0.9800000190734863

Also, if you start TensorBoard in another session of the terminal and access http://0.0.0.0:6006 with a web browser, the transition of values such as correct answer rate (accuray) and cross entropy (cross_entropy) for the training data is graphed. You can check it.

$ tensorboard --logdir ./log
Starting TensorBoard b'47' at http://0.0.0.0:6006
(Press CTRL+C to quit)

accuray and cross_entropy

8. Test the trained model

code

test.py


import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('batch_size', 1000, 'Batch size.')


def main():
  with tf.Graph().as_default():
    cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
    images, labels = load_data(
      'data/test/data.csv',
      batch_size=FLAGS.batch_size,
      image_size=FLAGS.image_size,
      class_count=len(Channel),
      shuffle=False)
    keep_prob = tf.placeholder(tf.float32)

    logits = cnn.inference(images, keep_prob)
    accuracy = cnn.accuracy(logits, labels)

    saver = tf.train.Saver()
    init_op = tf.global_variables_initializer()

    with tf.Session() as sess:
      sess.run(init_op)
      saver.restore(sess, os.path.join(LOG_DIR, 'model.ckpt'))
      coord = tf.train.Coordinator()
      threads = tf.train.start_queue_runners(sess=sess, coord=coord)

      accuracy_value = sess.run(accuracy, feed_dict={keep_prob: 0.5})

      print(f'test accuracy: {accuracy_value}')

      coord.request_stop()
      coord.join(threads)

if __name__ == '__main__':
  main()

Description

Measure the accuracy of the trained model by finding the correct answer rate (accuray) for the test data.

Execution result

$ find data/test -name '*.jpg' | wc -l
    1097
$ python test.py --batch_size 1097
test accuracy: 0.7657247185707092

This time, the correct answer rate was about 76.6%. If you infer at random, it should be a quarter, that is, 25.0%, so I think that you can learn correctly, but there seems to be room for improvement in accuracy.

9. Infer with a trained model

code

inference.py


import numpy as np
import os
import sys
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')


def load_image(imagepath, image_size):
  jpeg = tf.read_file(imagepath)
  image = tf.image.decode_jpeg(jpeg, channels=3)
  image = tf.cast(image, tf.float32)
  image = tf.image.resize_images(image, [image_size, image_size])
  image = tf.image.per_image_standardization(image)

  return image


def print_results(imagepath, softmax):
  os.system(f'imgcat {imagepath}')
  mex_channel_name_length = max(len(channel.name) for channel in Channel)
  for channel, value in zip(Channel, softmax):
    print(f'{channel.name.ljust(mex_channel_name_length + 1)}: {value}')

  print()

  prediction = Channel(np.argmax(softmax)).name
  for channel in Channel:
    if channel.name in imagepath:
      answer = channel.name
      break

  print(f'Guess: {prediction},Correct answer: {answer}')

Description

Finally, we make inferences using the trained model. Look at the result of the softmax function and use the class with the larger value as the inference result.

By the way, it is distributed on the iTerm2 and iTerm2 Images pages imgcat. You can use (: //raw.githubusercontent.com/gnachman/iTerm2/master/tests/imgcat) to output the image as it is on the terminal. It is convenient because the input image and the inference result can be combined and output on the terminal.

Execution result

Let's select some test data and infer it.

Success story: joy:

good

HikakinTV and HikakinGames have a high percentage of correct answers, probably because of the large amount of data.

Failure example: sob:

bad

On the other hand, HIKAKIN and HikakinBlog have a low percentage of correct answers, probably because the number of data is small.

Correct answer rate for each channel

I narrowed down the test data to only one channel and calculated the correct answer rate.

Channel Number of test data Correct answer rate(%)
HIKAKIN 50 20.0
HikakinBlog 19 15.8
HikakinGames 374 68.4
HikakinTV 654 69.4

Well, after all, if there is little data, the correct answer rate is extremely poor.

Improvement plan

  1. Increase the image size during learning.
  2. Increase the number of layers of CNN.
  3. Perform Data Augmentation to inflate the number of images.

In particular, I feel that it can be improved considerably just by increasing the number of training data, so I will try it in the future.

reference

There are innumerable reference materials, so we will carefully select only those that have been particularly helpful. Also, official documents are excluded.

Internet articles

Case study using TensorFlow

Articles of the gods: pray :: sparkles:

Image data input related in TensorFlow

Data entry in TensorFlow was very annoying. I really appreciate the articles of my predecessors: pray :: sparkles:

Other

Books

Recommended Posts

Identify the YouTube channel of Hikakin videos from thumbnail images using CNN
Transcription of YouTube videos using GCP's Cloud Speech-to-Text
Study from the beginning of Python Hour8: Using packages
I tried to identify the language using CNN + Melspectogram
Scraping member images from the official website of Sakamichi Group
Download the top 10 views from one Youtube channel at once
Upload videos using YouTube API
I checked the usage status of the parking lot from satellite images.