[PYTHON] Language prediction model by TensorFlow

This article introduces a learning model that predicts the language generated by RG (Reber Grammar), ERG (Embeded-), and CERG (Continuous-).

As a tutorial on implementing TensorFlow in LSTM, there is learning using a language dataset called PTB (Penn Treebank), but it is difficult to understand as an introduction. Therefore, as an introduction, we will learn a model that predicts the language generated from the algorithms (RG, ERG, CERG) that automatically generate a language consisting of eight characters.

About RG, ERG

alt https://www.willamette.edu/~gorr/classes/cs449/reber.html The reference site of is easy to understand. For example, "BTSSXSE.BPVVR." Is one of the RGs, and "BPBTSXXVVEPE." Is one of the ERGs. CERG is a language in which the terminator "." Is removed from ERG. This article predicts these eight-character languages.

Source code

The source code of github is listed below. We confirmed the operation with Python3 and TensorFlow API r1.1. Since TensorFlow changes greatly for each version, it may be better not to think that it will work as it is if the version is different.

RG_prediction_model.py


#! /usr/local/bin/python
# -*- coding:utf-8 -*-

import tensorflow as tf
import numpy as np
import random
from create_RG import ERG_generator

num_of_sample_length = 10000

class RG_predict_model:
  def __init__(self, data_model):
    self.num_of_hidden_nodes = 60
    self.chunk_size = 20
    self.model_file_name = "./tmp/model.ckpt"
    self.batch_size = 100
    self.forget_bias = 0.8
    self.learning_rate = 0.001
    self.num_of_epochs = 50000
    try:
      #train data set
      self.rggen = data_model()
      self.rggen.generate(num_of_sample_length)
      self.num_of_output_nodes = self.rggen.CHAR_VEC
      self.num_of_input_nodes  = self.rggen.CHAR_VEC 
      #test data set
      self.test_rggen = data_model()
      self.test_rggen.generate(num_of_sample_length)
    except:
      print("could not specify generator model")
      raise

  def inference(self, input_ph, istate_ph):
    with tf.name_scope("inference") as scope:
      weight1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
      weight2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
      bias1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes], stddev=0.1), name="bias1")
      bias2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_output_nodes], stddev=0.1), name="bias2")

      in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
      in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
      in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
      in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)

      cell = tf.contrib.rnn.BasicLSTMCell(
          self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
      outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
      output = tf.matmul(outputs[-1], weight2_var) + bias2_var
      return output

  def evaluate(self, output, label):
    with tf.name_scope("evaluate") as scope:
      prediction = tf.nn.softmax(output)
      correct_prediction = tf.equal(tf.argmax(output,1),tf.argmax(label,1))
      accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
      tf.summary.scalar("accuracy", accuracy)
      return prediction, accuracy

  def loss(self, output, label):
    with tf.name_scope("loss") as scope:
      loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(label, output))
      tf.summary.scalar("loss", loss)
      return loss

  def training(self, loss):
    with tf.name_scope("training") as scope:
      optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(loss)
      return optimizer

  def train(self):
    input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
    label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
    istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
    prediction = self.inference(input_ph, istate_ph)
    loss = self.loss(prediction, label_ph)
    optimizer = self.training(loss)
    evaluater = self.evaluate(prediction, label_ph)
    summary = tf.summary.merge_all()

    with tf.Session() as sess:
      summary_writer = tf.summary.FileWriter("./tmp/RG_log", graph=sess.graph)
      sess.run(tf.global_variables_initializer())
      ####### train ########
      for epoch in range(self.num_of_epochs):
        inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
        train_dict = {
          input_ph: inputs,
          label_ph: labels,
          istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
        }
        sess.run([optimizer], feed_dict=train_dict)

        if (epoch) % 100 ==0:
          summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
          print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
          summary_writer.add_summary(summary_str, epoch)

      ####### test #########
      inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
      test_dict = {
        input_ph: inputs,
        label_ph: labels,
        istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
      }
      prediction, acc = sess.run(evaluater, feed_dict=test_dict)
      for pred, label in zip(prediction, labels):
        print(np.argmax(pred) == np.argmax(label))
        print(['{:.2f}'.format(n) for n in pred])
        print(['{:.2f}'.format(n) for n in label])

      ####### save ########
      print("Training has done successfully")
      saver = tf.train.Saver()
      saver.save(sess, self.model_file_name)


if __name__ == '__main__':
  random.seed(0)
  np.random.seed(0)
  tf.set_random_seed(0)
  rg_model = RG_predict_model(ERG_generator)
  rg_model.train()

Next, the details will be explained in order.

python


def __init__(self, data_model):
    self.num_of_hidden_nodes = 60
    self.chunk_size = 20
    self.model_file_name = "./tmp/model.ckpt"
    self.batch_size = 100
    self.forget_bias = 0.8
    self.learning_rate = 0.001
    self.num_of_epochs = 50000
    try:
      #train data set
      self.rggen = data_model()
      self.rggen.generate(num_of_sample_length)
      self.num_of_output_nodes = self.rggen.CHAR_VEC
      self.num_of_input_nodes  = self.rggen.CHAR_VEC 
      #test data set
      self.test_rggen = data_model()
      self.test_rggen.generate(num_of_sample_length)
    except:
      print("could not specify generator model")
      raise

Since the input vector is represented by a one hot vector, it is represented by an 8-dimensional vector (eg B = (1,0,0,0,0,0,0,0)), but it is fully before inputting to the LSTM cell. Insert a connected layer and increase the feature amount to num_of_hidden_nodes = 60-dimensional vector. The LSTM requires a parameter that determines how many previous inputs affect the output, and this is specified by chunk_size. This time, 20 consecutive characters are input for one prediction. One of EG_model, ERG_model, CERG_model is entered in the argument data_model.

python


def inference(self, input_ph, istate_ph):
    with tf.name_scope("inference") as scope:
      weight1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
      weight2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
      bias1_var = tf.Variable(tf.truncated_normal(
          [self.num_of_hidden_nodes], stddev=0.1), name="bias1")
      bias2_var = tf.Variable(tf.truncated_normal(
          [self.num_of_output_nodes], stddev=0.1), name="bias2")

      in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
      in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
      in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
      in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)

      cell = tf.contrib.rnn.BasicLSTMCell(
          self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
      outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
      output = tf.matmul(outputs[-1], weight2_var) + bias2_var
      return output

Input (8D)-> fully connected layer-> (60D)-> LSTM-> (60D)-> fully connected layer-> output (8D). in1 ~ in4 are only converted to facilitate Wx + b calculation. tf.contrib.rnn.static_rnn gets the array size of the second argument without permission, creates a cell (first argument) and combines it. Each cell receives an input of [internal feature = 60] x [batch size = 100]. Overall inference: LSTM_inference.png Inside rnn (creating cells for chuk_size): LSTM_cell.png

evaluate, loss, training are omitted.

python


def train(self):
    input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
    label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
    istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
    prediction = self.inference(input_ph, istate_ph)
    loss = self.loss(prediction, label_ph)
    optimizer = self.training(loss)
    evaluater = self.evaluate(prediction, label_ph)
    summary = tf.summary.merge_all()

Define an instance for input and correct label, an instance of status information to be input at the beginning of LSTM, and an instance of each output. summary is the output of the result log to make it easy to debug with tensorboard etc.

python


####### train ########
      for epoch in range(self.num_of_epochs):
        inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
        train_dict = {
          input_ph: inputs,
          label_ph: labels,
          istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
        }
        sess.run([optimizer], feed_dict=train_dict)

        if (epoch) % 100 ==0:
          summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
          print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
          summary_writer.add_summary(summary_str, epoch)

Collect the inputs in the dictionary and start learning. Add the result to summary. rggen.get_batch brings the specified input data. For details, see create_RG.py on github. Let's see.

python


####### test #########
      inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
      test_dict = {
        input_ph: inputs,
        label_ph: labels,
        istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
      }
      prediction, acc = sess.run(evaluater, feed_dict=test_dict)
      for pred, label in zip(prediction, labels):
        print(np.argmax(pred) == np.argmax(label))
        print(['{:.2f}'.format(n) for n in pred])
        print(['{:.2f}'.format(n) for n in label])

Similarly, create an input dictionary for test and display the output. {: .2f} is a notation for outputting to the second decimal place. #The correct answer label and the output label are only output so that they are displayed vertically.

Reference site:

--About ERG - https://www.willamette.edu/~gorr/classes/cs449/reber.html --About the TensorFlow implementation of LSTM - http://qiita.com/yukiB/items/f6314d2861fc8d9b739f - http://www.madopro.net/entry/RNNLSTMLanguageModel

Recommended Posts

Language prediction model by TensorFlow
Prediction model construction ①
Faker summary by language
TensorFlow Tutorial-Sequence Transformation Model (Translation)
Sazae-san's rock-paper-scissors prediction by LightGBM
Pokemon classification by topic model
Markov switching model by Python
Super-resolution technology-SRCNN-Implemented (Tensorflow 2.0) Prediction phase
Challenge image classification by TensorFlow2 + Keras 4 ~ Let's predict with trained model ~
Stack processing speed comparison by language
Customize Model / Layer / Metric with TensorFlow
Natural Language: Word2Vec Part3 --CBOW model
By language: Regular expressions for passwords
How to convert Tensorflow model to Lite
Estimated Probit model by Binary Response model
Model generated by Variational Autoencoder (VAE)
100 Language Processing Knock Chapter 1 by Python
100 language processing knock-74 (using scikit-learn): Prediction
Natural Language: Word2Vec Part2 --Skip-gram model
Learn the basics of document classification by natural language processing, topic model