Apropos Japans Top-YouTuber, das ist richtig HIKAKIN (im Folgenden als "HIKAKIN" bezeichnet). Ich liebe es auch und schaue jeden Tag Videos.
Hikakin ist HIKAKIN, HikakinTV, [HikakinGames](https: // www. Wir betreiben vier Kanäle: youtube.com/user/HikakinGames) und HikakinBlog. Hier hielt ich es für interessant, anhand der Informationen, die als Miniaturbilder bezeichnet werden, zu bestimmen, zu welchem Kanal ein Video gehört, und implementierte es daher mithilfe von maschinellem Lernen.
Verwenden Sie TensorFlow als Framework für maschinelles Lernen. Verwenden Sie dann vom Sammeln von Bildern TensorFlow zu [CNN (Convolutional Neural Network)](https://ja.wikipedia.org/wiki/%E7%95%B3%E3%81%BF%E8%BE%BC%E3 % 81% BF% E3% 83% 8B% E3% 83% A5% E3% 83% BC% E3% 83% A9% E3% 83% AB% E3% 83% 8D% E3% 83% 83% E3% 83 Ich möchte den Implementierungsfluss vom Punkt der Implementierung von% 88% E3% 83% AF% E3% 83% BC% E3% 82% AF) bis zum Punkt der tatsächlichen Schlussfolgerung einführen.
Python
Werkzeug | Ausführung | Verwendung / Zweck |
---|---|---|
Python | 3.6.1 | |
Selenium | 3.4.0 | Schaben |
TensorFlow | 1.1.0 | Maschinelles Lernen |
NumPy | 1.12.1 | Numerische Berechnung |
Werkzeug | Ausführung | Verwenden |
---|---|---|
ChromeDriver | 2.29 | So führen Sie Chrome auf Selenium aus |
iTerm2 | 3.0.15 | Anzeigen des Bildes auf dem Terminal |
fetch_urls.py
import os
import sys
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def fetch_urls(channel):
driver = webdriver.Chrome()
url = os.path.join('https://www.youtube.com/user', channel, 'videos')
driver.get(url)
while True:
driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
try:
#Warten Sie, bis die Schaltfläche "Mehr laden" anklickbar ist.
more = WebDriverWait(driver, 3).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'load-more-button'))
)
except StaleElementReferenceException:
continue;
except TimeoutException:
break;
more.click()
selector = '.yt-thumb-default .yt-thumb-clip img'
elements = driver.find_elements_by_css_selector(selector)
src_list = [element.get_attribute('src') for element in elements]
driver.quit()
with open(f'urls/{channel}.txt', 'wt') as f:
for src in src_list:
print(src, file=f)
if __name__ == '__main__':
fetch_urls(sys.argv[1])
Verwenden Sie Selenium, um mit Google Chrome zu interagieren und URLs für Miniaturbilder zu sammeln.
Scrollen Sie zuerst den Bildschirm nach unten, bis die Schaltfläche "Mehr laden" am unteren Bildschirmrand verschwindet. Dadurch werden alle Miniaturbilder auf dem Browserbildschirm angezeigt. Holen Sie sich danach den Wert des src-Attributs aller img-Elemente, die dem Miniaturbild entsprechen, und schreiben Sie ihn in die Textdatei im Verzeichnis urls
.
$ python fetch_urls.py HikakinTV
$ wc -l urls/HikakinTV.txt
2178 urls/HikakinTV.txt
$ head -n 3 urls/HikakinTV.txt
https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=tRWLF3Pa-fZrEa5XTmPeHyVORv4
https://i.ytimg.com/vi/bolTkMSMrSA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=a0_PeYpyB9RrOhb3ySd4i7nJ9P8
https://i.ytimg.com/vi/jm4cK_XPqMA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=VymexTRKLE_wQaYtSKqrph1okcA
download.rb
import os
import random
import re
import sys
import time
from urllib.request import urlretrieve
def download(channel):
with open(f'urls/{channel}.txt', 'rt') as f:
lines = f.readlines()
dir = os.path.join('images', channel)
if not os.path.exists(dir):
os.makedirs(dir)
for url in lines:
# https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg
#Verwenden Sie den ieHNKaG1KfA-Teil der URL als Bildnamen.
name = re.findall(r'(?<=vi/).*(?=/hqdefault)', url)[0]
path = os.path.join(dir, f'{name}.jpg')
if os.path.exists(path):
print(f'{path} already exists')
continue
print(f'download {path}')
urlretrieve(url, path)
time.sleep(1 + random.randint(0, 2))
if __name__ == '__main__':
download(sys.argv[1])
Verwenden Sie nach dem Lesen der von fetch_urls.py
ausgegebenen Textdatei urlretrieve (). Laden Sie das Miniaturbild herunter.
Übrigens haben alle heruntergeladenen Miniaturbilder eine einheitliche Größe von 196 x 110. Es sollte leicht zu handhaben sein: erröten:
$ python download.py HikakinTV
download images/HikakinTV/1ngTnVb9oF0.jpg
download images/HikakinTV/AGonzpJtyYU.jpg
images/HikakinTV/MvwxFi3ypNg.jpg already exists
(Abkürzung)
$ ls -1 images/HikakinTV | wc -l
2178
$ ls -1 images/HikakinTV
-2DRamjx75o.jpg
-5Xk6i1jVhs.jpg
-9U3NOHsT1k.jpg
(Abkürzung)
split_images.py
import glob
import numpy as np
import os
import shutil
def clean_data():
for dirpath, _, filenames in os.walk('data'):
for filename in filenames:
os.remove(os.path.join(dirpath, filename))
def split_pathnames(dirpath):
pathnames = glob.glob(f'{dirpath}/*')
np.random.shuffle(pathnames)
#Referenz:Datensatz mit NumPy(ndarray)In beliebige Proportionen teilen
# http://qiita.com/QUANON/items/e28335fa0e9f553d6ab1
return np.split(pathnames, [int(0.7 * len(pathnames))])
def copy_images(data_dirname, class_dirname, image_pathnames):
class_dirpath = os.path.join('data', data_dirname, class_dirname)
if not os.path.exists(class_dirpath):
os.makedirs(class_dirpath)
for image_pathname in image_pathnames:
image_filename = os.path.basename(image_pathname)
shutil.copyfile(image_pathname,
os.path.join(class_dirpath, image_filename))
def split_images():
for class_dirname in os.listdir('images'):
image_dirpath = os.path.join('images', class_dirname)
if not os.path.isdir(image_dirpath):
continue
train_pathnames, test_pathnames = split_pathnames(image_dirpath)
copy_images('train', class_dirname, train_pathnames)
copy_images('test', class_dirname, test_pathnames)
if __name__ == '__main__':
clean_data()
split_images()
Die in das Verzeichnis "images / channel name" heruntergeladenen Bilddateien werden zufällig in Trainingsdaten und Testdaten unterteilt. Insbesondere werden die Bilddateien im Verzeichnis "Bilder / Kanalname" im Verzeichnis "Daten / Zug / Kanalname" oder im Verzeichnis "Daten / Test / Kanalname" gespeichert, so dass das Verhältnis von Trainingsdaten zu Testdaten 7: 3 beträgt. Kopieren nach.
images/
├ HIKAKIN/
├ HikakinBlog/
├ HikakinGames/
└ HikakinTV/
↓ train : test = 7 :Kopieren Sie, um 3 zu sein
data/
├ train/
│ ├ HIKAKIN/
│ ├ HikakinBlog/
│ ├ HikakinGames/
│ └ HikakinTV/
│
└ test/
├ HIKAKIN/
├ HikakinBlog/
├ HikakinGames/
└ HikakinTV/
$ python split_images.py
$ find images -name '*.jpg' | wc -l
3652
$ find data/train -name '*.jpg' | wc -l
2555
$ find data/test -name '*.jpg' | wc -l
1097
config.py
from enum import Enum
class Channel(Enum):
HIKAKIN = 0
HikakinBlog = 1
HikakinGames = 2
HikakinTV = 3
LOG_DIR = 'log'
write_csv_file.py
import os
import csv
from config import Channel, LOG_DIR
def write_csv_file(dir):
with open(os.path.join(dir, 'data.csv'), 'wt') as f:
for i, channel in enumerate(Channel):
image_dir = os.path.join(dir, channel.name)
writer = csv.writer(f, lineterminator='\n')
for filename in os.listdir(image_dir):
writer.writerow([os.path.join(image_dir, filename), i])
if __name__ == '__main__':
write_csv_file('data/train')
write_csv_file('data/test')
Es wird später in Schulungen und Tests zum Laden von Bildern und Etiketten mit TensorFlow verwendet.
$ python write_csv_file.py
$ cat data/train/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abkürzung)
data/train/HikakinBlog/-OtqlF5BMNY.jpg,1
data/train/HikakinBlog/07XKtHfni1A.jpg,1
(Abkürzung)
data/train/HikakinGames/-2VyYsCkPZI.jpg,2
data/train/HikakinGames/-56bZU-iqQ4.jpg,2
(Abkürzung)
data/train/HikakinTV/-5Xk6i1jVhs.jpg,3
data/train/HikakinTV/-9U3NOHsT1k.jpg,3
(Abkürzung)
$ cat data/test/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abkürzung)
data/test/HikakinBlog/2Z6GB9JjV4I.jpg,1
data/test/HikakinBlog/4eGZtFhZWIE.jpg,1
(Abkürzung)
data/test/HikakinGames/-FpYaEmiq1M.jpg,2
data/test/HikakinGames/-HFXWY1-M8M.jpg,2
(Abkürzung)
data/test/HikakinTV/-2DRamjx75o.jpg,3
data/test/HikakinTV/-9zt1EfKJYI.jpg,3
(Abkürzung)
cnn.py
import tensorflow as tf
class CNN:
def __init__(self, image_size=48, class_count=2, color_channel_count=3):
self.image_size = image_size
self.class_count = class_count
self.color_channel_count = color_channel_count
#Inferenzfunktion.
def inference(self, x, keep_prob, softmax=False):
#Tf zum Speichern von Gewichten.Erstellen Sie eine Variable.
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
#Tf, um die Vorspannung zu speichern.Erstellen Sie eine Variable.
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
#Falten Sie es nach oben.
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
# [2x2]Das Pooling wird mit der Größe und dem Ausmaß der Bewegung 2 durchgeführt.
def max_pool_2x2(x):
return tf.nn.max_pool(x,
ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1],
padding='SAME')
x_image = tf.reshape(
x,
[-1, self.image_size, self.image_size, self.color_channel_count])
with tf.name_scope('conv1'):
W_conv1 = weight_variable([5, 5, self.color_channel_count, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
with tf.name_scope('pool1'):
h_pool1 = max_pool_2x2(h_conv1)
with tf.name_scope('conv2'):
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
with tf.name_scope('pool2'):
h_pool2 = max_pool_2x2(h_conv2)
with tf.name_scope('fc1'):
W_fc1 = weight_variable(
[int(self.image_size / 4) * int(self.image_size / 4) * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(
h_pool2,
[-1, int(self.image_size / 4) * int(self.image_size / 4) * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
with tf.name_scope('fc2'):
W_fc2 = weight_variable([1024, self.class_count])
b_fc2 = bias_variable([self.class_count])
y = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
if softmax:
with tf.name_scope('softmax'):
y = tf.nn.softmax(y)
return y
#Verlustfunktion zur Berechnung des Fehlers zwischen dem Inferenzergebnis und der richtigen Antwort.
def loss(self, y, labels):
#Berechnen Sie die Kreuzentropie.
# tf.nn.softmax_cross_entropy_with_logits argument logits
#Geben Sie keine Variablen an, auf die die Softmax-Funktion angewendet wird.
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=y, labels=labels))
tf.summary.scalar('cross_entropy', cross_entropy)
return cross_entropy
#Funktionen zum Lernen
def training(self, cross_entropy, learning_rate=1e-4):
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
return train_step
#Richtige Antwortrate(accuracy)Fragen.
def accuracy(self, y, labels):
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(labels, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar('accuracy', accuracy)
return accuracy
Eine Implementierung des CNN-Modells. Es ist das Herzstück dieses Projekts, aber es geht hauptsächlich um das TensorFlow-Tutorial Deep MNIST für Experten [Code](https: // github. Entspricht com / tensorflow / tensorflow / blob / master / tensorflow / examples / tutorials / mnist / mnist_deep.py). Es wird jedoch klassifiziert, um die Vielseitigkeit zu erhöhen.
load_data.py
import tensorflow as tf
def load_data(csvpath, batch_size, image_size, class_count,
shuffle=False, min_after_dequeue=1000):
queue = tf.train.string_input_producer([csvpath], shuffle=shuffle)
reader = tf.TextLineReader()
key, value = reader.read(queue)
imagepath, label = tf.decode_csv(value, [['imagepath'], [0]])
jpeg = tf.read_file(imagepath)
image = tf.image.decode_jpeg(jpeg, channels=3)
image = tf.image.resize_images(image, [image_size, image_size])
#Im Durchschnitt auf 0 skalieren.
image = tf.image.per_image_standardization(image)
#Beschriften Sie den Wert eins-In heißen Ausdruck konvertieren.
label = tf.one_hot(label, depth=class_count, dtype=tf.float32)
capacity = min_after_dequeue + batch_size * 3
if shuffle:
images, labels = tf.train.shuffle_batch(
[image, label],
batch_size=batch_size,
num_threads=4,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
else:
images, labels = tf.train.batch(
[image, label],
batch_size=batch_size,
capacity=capacity)
return images, labels
Eine Funktion zum Lesen von Bildern und Etiketten aus CSV. Es wird später beim Lernen und Testen verwendet. Verwenden Sie tf.train.shuffle_batch (), um Testdaten während des Trainings zu mischen, ohne während des Tests zu mischen Ich gehe davon aus, dass Sie [tf.train.batch ()] verwenden (https://www.tensorflow.org/api_docs/python/tf/train/batch).
train.py
import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data
#Unterdrücken Sie TensorFlow-Warnmeldungen.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('step_count', 1000, 'Number of steps.')
flags.DEFINE_integer('batch_size', 50, 'Batch size.')
flags.DEFINE_float('learning_rate', 1e-4, 'Initial learning rate.')
def main():
with tf.Graph().as_default():
cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
images, labels = load_data(
'data/train/data.csv',
batch_size=FLAGS.batch_size,
image_size=FLAGS.image_size,
class_count=len(Channel),
shuffle=True)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, keep_prob)
loss = cnn.loss(logits, labels)
train_op = cnn.training(loss, FLAGS.learning_rate)
accuracy = cnn.accuracy(logits, labels)
saver = tf.train.Saver()
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter(LOG_DIR, sess.graph)
for step in range(1, FLAGS.step_count + 1):
_, loss_value, accuracy_value = sess.run(
[train_op, loss, accuracy], feed_dict={keep_prob: 0.5})
if step % 10 == 0:
print(f'step {step}: training accuracy {accuracy_value}')
summary = sess.run(summary_op, feed_dict={keep_prob: 1.0})
summary_writer.add_summary(summary, step)
coord.request_stop()
coord.join(threads)
save_path = saver.save(sess, os.path.join(LOG_DIR, 'model.ckpt'))
if __name__ == '__main__':
main()
Lesen Sie das Bild und lernen Sie CNN. Dieses Mal werden 1.000 Lernschritte ausgeführt und alle 10 Schritte wird die richtige Antwortrate (Genauigkeit) ausgegeben. Speichern Sie die gelernten Parameter in log / model.ckpt
.
$ python train.py
step 10: training accuracy 0.5600000023841858
step 20: training accuracy 0.47999998927116394
step 30: training accuracy 0.7200000286102295
(Abkürzung)
step 980: training accuracy 1.0
step 990: training accuracy 0.9800000190734863
step 1000: training accuracy 0.9800000190734863
Wenn Sie TensorBoard in einer anderen Sitzung des Terminals starten und mit einem Webbrowser auf http://0.0.0.0:6006 zugreifen, wird der Übergang von Werten wie korrekter Antwortrate (Genauigkeit) und Kreuzentropie (Kreuzentropie) für Trainingsdaten grafisch dargestellt. Du kannst nachschauen.
$ tensorboard --logdir ./log
Starting TensorBoard b'47' at http://0.0.0.0:6006
(Press CTRL+C to quit)
test.py
import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('batch_size', 1000, 'Batch size.')
def main():
with tf.Graph().as_default():
cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
images, labels = load_data(
'data/test/data.csv',
batch_size=FLAGS.batch_size,
image_size=FLAGS.image_size,
class_count=len(Channel),
shuffle=False)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, keep_prob)
accuracy = cnn.accuracy(logits, labels)
saver = tf.train.Saver()
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
saver.restore(sess, os.path.join(LOG_DIR, 'model.ckpt'))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
accuracy_value = sess.run(accuracy, feed_dict={keep_prob: 0.5})
print(f'test accuracy: {accuracy_value}')
coord.request_stop()
coord.join(threads)
if __name__ == '__main__':
main()
Messen Sie die Genauigkeit des trainierten Modells, indem Sie die richtige Antwortrate (Genauigkeit) für die Testdaten ermitteln.
$ find data/test -name '*.jpg' | wc -l
1097
$ python test.py --batch_size 1097
test accuracy: 0.7657247185707092
Diesmal lag die korrekte Antwortrate bei 76,6%. Wenn Sie zufällig schließen, sollte es ein Viertel oder 25,0% sein, damit Sie richtig lernen können, aber es scheint Raum für Verbesserungen der Genauigkeit zu geben.
inference.py
import numpy as np
import os
import sys
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
def load_image(imagepath, image_size):
jpeg = tf.read_file(imagepath)
image = tf.image.decode_jpeg(jpeg, channels=3)
image = tf.cast(image, tf.float32)
image = tf.image.resize_images(image, [image_size, image_size])
image = tf.image.per_image_standardization(image)
return image
def print_results(imagepath, softmax):
os.system(f'imgcat {imagepath}')
mex_channel_name_length = max(len(channel.name) for channel in Channel)
for channel, value in zip(Channel, softmax):
print(f'{channel.name.ljust(mex_channel_name_length + 1)}: {value}')
print()
prediction = Channel(np.argmax(softmax)).name
for channel in Channel:
if channel.name in imagepath:
answer = channel.name
break
print(f'Vermuten: {prediction},Richtige Antwort: {answer}')
Machen Sie abschließend anhand des trainierten Modells Rückschlüsse. Sehen Sie sich das Ergebnis der Softmax-Funktion an und verwenden Sie die Klasse mit dem größeren Wert als Inferenzergebnis.
Übrigens wird es auf den Seiten iTerm2 und Images von iTerm2 imgcat verteilt. Sie können (: //raw.githubusercontent.com/gnachman/iTerm2/master/tests/imgcat) verwenden, um das Bild so auszugeben, wie es sich auf dem Terminal befindet. Dies ist praktisch, da das Eingabebild und das Inferenzergebnis kombiniert und auf dem Terminal ausgegeben werden können.
Lassen Sie uns einige Testdaten auswählen und daraus schließen.
HikakinTV und HikakinGames haben einen hohen Prozentsatz an richtigen Antworten, wahrscheinlich aufgrund der großen Datenmenge.
Andererseits haben HIKAKIN und HikakinBlog einen geringen Prozentsatz an richtigen Antworten, wahrscheinlich weil die Anzahl der Daten gering ist.
Ich habe die Testdaten auf nur einen Kanal eingegrenzt und die richtige Antwortrate berechnet.
Kanal | Anzahl der Testdaten | Richtige Antwortrate(%) |
---|---|---|
HIKAKIN | 50 | 20.0 |
HikakinBlog | 19 | 15.8 |
HikakinGames | 374 | 68.4 |
HikakinTV | 654 | 69.4 |
Wenn nur wenige Daten vorliegen, ist die richtige Antwortrate schließlich äußerst schlecht.
Insbesondere bin ich der Meinung, dass es erheblich verbessert werden kann, wenn nur die Anzahl der Trainingsdaten erhöht wird. Deshalb werde ich es in Zukunft versuchen.
Es gibt unzählige Referenzmaterialien, daher werden wir nur diejenigen sorgfältig auswählen, die besonders hilfreich waren. Auch offizielle Dokumente sind ausgeschlossen.
Artikel der Götter: bete :: funkelt:
Ich war sehr verärgert über die Dateneingabe von TensorFlow. Ich schätze die Artikel meiner Vorgänger sehr: bete :: funkelt:
Recommended Posts