In diesem Artikel wird ein Lernmodell vorgestellt, das die von RG (Reber Grammar), ERG (Embeded-) und CERG (Continuous-) erzeugte Sprache vorhersagt.
Als Tutorial zur Implementierung von TensorFlow von LSTM wird mit einem Sprachdatensatz namens PTB (Penn Treebank) gelernt, der jedoch als Einführung schwer zu verstehen ist. Daher werden wir als Einführung ein Modell lernen, das die Sprache vorhersagt, die aus den Algorithmen (RG, ERG, CERG) generiert wird, die automatisch eine Sprache mit acht Zeichen generieren.
https://www.willamette.edu/~gorr/classes/cs449/reber.html Die Referenzseite von ist leicht zu verstehen. Beispiel: "BTSSXSE.BPVVR" ist eine der RGs und "BPBTSXXVVEPE" ist eine der ERGs. CERG ist eine Sprache, in der das Endzeichen "." Aus ERG entfernt wird. Dieser Artikel sagt diese achtstelligen Sprachen voraus.
Der Quellcode von github ist unten aufgeführt. Wir haben den Vorgang mit Python3 und TensorFlow API r1.1 bestätigt. TensorFlow ändert sich stark für jede Version. Wenn die Version also anders ist, sollten Sie nicht glauben, dass sie so funktioniert, wie sie ist.
RG_prediction_model.py
#! /usr/local/bin/python
# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np
import random
from create_RG import ERG_generator
num_of_sample_length = 10000
class RG_predict_model:
def __init__(self, data_model):
self.num_of_hidden_nodes = 60
self.chunk_size = 20
self.model_file_name = "./tmp/model.ckpt"
self.batch_size = 100
self.forget_bias = 0.8
self.learning_rate = 0.001
self.num_of_epochs = 50000
try:
#train data set
self.rggen = data_model()
self.rggen.generate(num_of_sample_length)
self.num_of_output_nodes = self.rggen.CHAR_VEC
self.num_of_input_nodes = self.rggen.CHAR_VEC
#test data set
self.test_rggen = data_model()
self.test_rggen.generate(num_of_sample_length)
except:
print("could not specify generator model")
raise
def inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(tf.truncated_normal(
[self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
weight2_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
bias1_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes], stddev=0.1), name="bias1")
bias2_var = tf.Variable(tf.truncated_normal(
[self.num_of_output_nodes], stddev=0.1), name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)
cell = tf.contrib.rnn.BasicLSTMCell(
self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
output = tf.matmul(outputs[-1], weight2_var) + bias2_var
return output
def evaluate(self, output, label):
with tf.name_scope("evaluate") as scope:
prediction = tf.nn.softmax(output)
correct_prediction = tf.equal(tf.argmax(output,1),tf.argmax(label,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar("accuracy", accuracy)
return prediction, accuracy
def loss(self, output, label):
with tf.name_scope("loss") as scope:
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(label, output))
tf.summary.scalar("loss", loss)
return loss
def training(self, loss):
with tf.name_scope("training") as scope:
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(loss)
return optimizer
def train(self):
input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
prediction = self.inference(input_ph, istate_ph)
loss = self.loss(prediction, label_ph)
optimizer = self.training(loss)
evaluater = self.evaluate(prediction, label_ph)
summary = tf.summary.merge_all()
with tf.Session() as sess:
summary_writer = tf.summary.FileWriter("./tmp/RG_log", graph=sess.graph)
sess.run(tf.global_variables_initializer())
####### train ########
for epoch in range(self.num_of_epochs):
inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
train_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
sess.run([optimizer], feed_dict=train_dict)
if (epoch) % 100 ==0:
summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
summary_writer.add_summary(summary_str, epoch)
####### test #########
inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
test_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
prediction, acc = sess.run(evaluater, feed_dict=test_dict)
for pred, label in zip(prediction, labels):
print(np.argmax(pred) == np.argmax(label))
print(['{:.2f}'.format(n) for n in pred])
print(['{:.2f}'.format(n) for n in label])
####### save ########
print("Training has done successfully")
saver = tf.train.Saver()
saver.save(sess, self.model_file_name)
if __name__ == '__main__':
random.seed(0)
np.random.seed(0)
tf.set_random_seed(0)
rg_model = RG_predict_model(ERG_generator)
rg_model.train()
Als nächstes werden die Details der Reihe nach erklärt.
python
def __init__(self, data_model):
self.num_of_hidden_nodes = 60
self.chunk_size = 20
self.model_file_name = "./tmp/model.ckpt"
self.batch_size = 100
self.forget_bias = 0.8
self.learning_rate = 0.001
self.num_of_epochs = 50000
try:
#train data set
self.rggen = data_model()
self.rggen.generate(num_of_sample_length)
self.num_of_output_nodes = self.rggen.CHAR_VEC
self.num_of_input_nodes = self.rggen.CHAR_VEC
#test data set
self.test_rggen = data_model()
self.test_rggen.generate(num_of_sample_length)
except:
print("could not specify generator model")
raise
Da der Eingabevektor durch einen heißen Vektor dargestellt wird, wird er durch einen 8-dimensionalen Vektor dargestellt (z. B. B = (1,0,0,0,0,0,0,0)), jedoch vollständig vor der Eingabe in die LSTM-Zelle. Fügen Sie eine verbundene Ebene ein und erhöhen Sie den Feature-Betrag auf num_of_hidden_nodes = 60-dimensionaler Vektor. LSTM erfordert einen Parameter, der bestimmt, wie viele vorherige Eingaben sich auf die Ausgabe auswirken. Dies wird durch chunk_size angegeben. Dieses Mal werden 20 aufeinanderfolgende Zeichen für eine Vorhersage eingegeben. Eines von EG_model, ERG_model und CERG_model wird in das Argument data_model eingegeben.
python
def inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(tf.truncated_normal(
[self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
weight2_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
bias1_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes], stddev=0.1), name="bias1")
bias2_var = tf.Variable(tf.truncated_normal(
[self.num_of_output_nodes], stddev=0.1), name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)
cell = tf.contrib.rnn.BasicLSTMCell(
self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
output = tf.matmul(outputs[-1], weight2_var) + bias2_var
return output
Eingang (8D) -> vollständig verbundene Schicht -> (60D) -> LSTM-> (60D) -> vollständig verbundene Schicht -> Ausgang (8D). in1 ~ in4 werden nur konvertiert, um die Berechnung von Wx + b zu erleichtern. tf.contrib.rnn.static_rnn ruft die Arraygröße des zweiten Arguments ohne Erlaubnis ab, erstellt eine Zelle (erstes Argument) und kombiniert sie. Jede Zelle erhält eine Eingabe von [interne Merkmalsmenge = 60] x [Chargengröße = 100]. Gesamtschlussfolgerung: Innerhalb von rnn (Zellen für chuk_size erstellen):
bewerten, verlieren, trainieren entfallen.
python
def train(self):
input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
prediction = self.inference(input_ph, istate_ph)
loss = self.loss(prediction, label_ph)
optimizer = self.training(loss)
evaluater = self.evaluate(prediction, label_ph)
summary = tf.summary.merge_all()
Definieren Sie eine Instanz für die Eingabe und das richtige Antwortetikett, eine Instanz von Statusinformationen, die zu Beginn von LSTM eingegeben werden sollen, und eine Instanz für jede Ausgabe. Zusammenfassung ist die Ausgabe des Ergebnisprotokolls, um das Debuggen mit Tensorboard usw. zu vereinfachen.
python
####### train ########
for epoch in range(self.num_of_epochs):
inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
train_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
sess.run([optimizer], feed_dict=train_dict)
if (epoch) % 100 ==0:
summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
summary_writer.add_summary(summary_str, epoch)
Sammeln Sie die Eingaben im Wörterbuch und beginnen Sie zu lernen. Fügen Sie das Ergebnis zur Zusammenfassung hinzu. rggen.get_batch bringt die angegebenen Eingabedaten. Weitere Informationen finden Sie unter create_RG.py in github. Mal sehen.
python
####### test #########
inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
test_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
prediction, acc = sess.run(evaluater, feed_dict=test_dict)
for pred, label in zip(prediction, labels):
print(np.argmax(pred) == np.argmax(label))
print(['{:.2f}'.format(n) for n in pred])
print(['{:.2f}'.format(n) for n in label])
Erstellen Sie auf ähnliche Weise ein Eingabewörterbuch zum Testen und zeigen Sie die Ausgabe an. {: .2f} ist eine Notation für die Ausgabe bis zur zweiten Dezimalstelle. #Das richtige Antwortetikett und das Ausgabeetikett werden nur so ausgegeben, dass sie vertikal angezeigt werden.
Referenzseite:
Recommended Posts