[PYTHON] [TensorFlow / Keras] The road to assembling an RNN with your favorite structure

Introduction

** RNN (Recurrent Neural Network) is a form of neural network that takes time series data as input and determines the output by using the "state" of the previous time in addition to the input of the current time. Is LSTM (Long Short-Term Memory) famous? Time-series data is data that has meaning for the entire column, such as video and text. An ordinary neural network takes a certain format of data such as images and characters as input, but when dealing with videos and texts in which they are lined up, not only individual images (frames) and characters, but also their arrangement. Also has a big meaning. The structure that handles such data well is the RNN.

However, I think it's hard to be honest, unlike ordinary fully connected layers. Me too.

So, first I want to understand what an RNN does, and then I want to be able to build my own RNN that uses the "state" of the previous time.

Verification environment

Target

I would like to solve the following problems (because I was actually worried ...).

--I don't really understand the difference between RNN, SimpleRNN and SimpleRNNCell. ――I want to understand the contents of layers such as LSTM. ――I want to write a non-standard RNN layer by myself to retest the experiment of the paper

[^ basiclstm]: The basics of LSTM that can't be heard now-HELLO CYBERNETICS

It would be nice if you could build your own RNN using Keras by looking at the network structure (graph) and mathematical formulas as described on the reference page [^ basiclstm].

On the contrary, the following things are not dealt with on this page. Maybe I'll write an article on another occasion.

--How to think about the network structure according to the problem you want to solve --How to handle variable length (various lengths for each sample) input -[Try basic RNN (LSTM) with Keras --Qiita](https://qiita.com/everylittle/items/ba821e93d275a421ca2b#%E5%8F%AF%E5%A4%89%E9%95% B7% E3% 81% AE% E7% B3% BB% E5% 88% 97% E3% 82% 92% E5% 85% A5% E5% 8A% 9B% E3% 81% 99% E3% 82% 8B% E5% A0% B4% E5% 90% 88-1) --How to use return_state or stateful --How to use Bidirectional RNN

RNN basics

The basis of an RNN is ** "the output depends on the input of the current time and the" state "of the previous time" **. The difference is that in a normal fully connected layer or convolution layer, the output depends only on the input, but in the RNN, the information of the previous input can also be used. You can decide for yourself what "state" you want to bring to the next time.

As shown in the figure on the left side below, RNN can be expressed as a network (cell) with a recursive structure, but the specific operation is in the form of expanding the loop as shown in the figure on the right side. You can understand (source [^ 1]).

image.png

--Input: $ x_1, x_2, ..., x_t, ... $

When given

--Output: $ o_1, o_2, ..., o_t, ... $ --Status: $ s_1, s_2, ..., s_t, ... $

To

\begin{align}
s_t &= f(Ux_t + Ws_{t-1} + b) \\
o_t &= h(Vs_t)
\end{align}

It is determined as follows. Where $ U, V, W $ are matrices and $ b $ are column vectors, which are layer weights (parameters to train). $ f and h $ are activation functions. I / O and states $ x_t, o_t, s_t $ are also column vectors.

The simplest RNN

First, let's touch the RNN. As a simple RNN, consider a network that sequentially outputs the partial sum of the input sequences (the sum of all the values from the beginning to that point). At this time, the partial sum is defined as "state" and the state is output as it is. For example, the output and status for the input will change as shown in the table below.

t 1 2 3 4 5 6 7 ...
x_t 1 3 2 4 1 0 1
s_t 1 4 6 10 11 11 12
o_t 1 4 6 10 11 11 12

In TensorFlow + Keras, by using a layer called tf.keras.layers.SimpleRNN

\begin{align}
o_t = s_t = f(Ux_t + Ws_{t-1} + b) \tag{1}
\end{align}

You can define a network of the form. If you set $ f (x) = x $ and train a sequence of numbers and their integration by parts,

\begin{align}
o_t = s_t = Ux_t + Ws_{t-1} + b
\end{align}

It is expected that the weight of will approach $ U = W = 1, ; b = 0 $ (this time we are dealing with one-dimensional values, so you can think of $ U, W, b $ as scalars. Hmm).

Try learning with the code below. A sequence of random numbers of length 30 and a sequence of partial sums calculated from them are given for learning.

first.py


import tensorflow as tf 
import numpy as np 
from tensorflow.keras import Sequential 
from tensorflow.keras.layers import SimpleRNN
from tensorflow.keras.optimizers import SGD

tf.random.set_seed(111)
np.random.seed(111)

model = Sequential([
    SimpleRNN(1, activation=None, input_shape=(None, 1), return_sequences=True)
])
model.compile(optimizer=SGD(lr=0.0001), loss="mean_squared_error")

n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1)

model.fit(x, y, batch_size=512, epochs=100)

model.layers[0].weights
# [<tf.Variable 'simple_rnn/kernel:0' shape=(1, 1) dtype=float32, numpy=array([[0.6021545]], dtype=float32)>,
#  <tf.Variable 'simple_rnn/recurrent_kernel:0' shape=(1, 1) dtype=float32, numpy=array([[1.0050855]], dtype=float32)>,
#  <tf.Variable 'simple_rnn/bias:0' shape=(1,) dtype=float32, numpy=array([0.20719269], dtype=float32)>]

model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 0.5082699,  1.0191246,  1.5325773,  2.0486412,  2.5673294,
#         3.0886555,  3.6126328,  4.1392746,  4.6685944,  5.2006063,
#         5.7353234,  6.27276  ,  6.8129296,  7.3558464,  7.901524 ,
#         8.449977 ,  9.00122  ,  9.555265 , 10.112128 , 10.6718235,
#        11.2343645, 11.799767 , 12.368044 , 12.939212 , 13.513284 ,
#        14.090276 , 14.670201 , 15.253077 , 15.838916 , 16.427734 ],
#       dtype=float32)

The error seems to be large, but since it is a sample, it would be nice if you could grasp the atmosphere (output accuracy is not pursued here). Here, as a result of learning

\begin{align}
o_t = s_t = 0.6022x_t + 1.0051s_{t-1} + 0.2072
\end{align}

Will be obtained.

Explanation of SimpleRNN

SimpleRNN(1, activation=None, input_shape=(None, 1), return_sequences=True)

--The first 1 is the number of dimensions of $ o_t, s_t $. This time it is a scalar, so I specified 1. --ʻactivation`` is equivalent to $ f $ in equation (1). This time it is an identity function, so I chose `` None``. In the case of `` Dense`` etc., the default is the identity function, but please note that the default is `` tanh`` in the RNN system. --ʻinput_shapetakes the form of (None, dimension) . The first Nonecorresponds to the length of each input column (it is None to accept variable length input). The second is the number of dimensions of $ x_t $ (1 this time). -- return_sequences = Trueindicates that the output of each time should be returned as the output of the layer. With this specification, for example, the output shape for a column of length 30 will be (batch_size, 30, 1) . If this is False, the layer will only output at the last time ($ o_ {30} $ in this case) and the output will be (batch_size, 1) ``. Please use it properly according to the question setting and the format of the learning data.

See the official documentation for details. tf.keras.layers.SimpleRNN | TensorFlow Core v2.1.0

Rewriting using RNN

The code that creates the model above is equivalent to:

from tensorflow.keras.layers import RNN, SimpleRNN, SimpleRNNCell

model = Sequential([
    #SimpleRNN(1, activation=None, input_shape=(None, 1), return_sequences=True) 
    RNN(SimpleRNNCell(1, activation=None), input_shape=(None, 1), return_sequences=True)
])

The cell is the inside of the for loop of a RNN layer. Wrapping a cell inside a tf.keras.layers.RNN layer gives you a layer capable of processing batches of sequences, e.g. RNN(LSTMCell(10)).

Recurrent Neural Networks (RNN) with Keras | TensorFlow Core

SimpleRNNCell defines an operation (cell) for a single sample, and enclosing it in RNN () defines a layer to process the batch.

In other words, you should be able to define an RNN of your favorite structure by defining your own sample-based processing equivalent to SimpleRNNCell and enclosing it in RNN ().

So far, we have understood the "difference between RNN, SimpleRNN, and SimpleRNNCell” mentioned at the beginning.

Let's take a look at the contents of SimpleRNNCell

In preparation for creating your own image of an RNN, let's first see what the existing SimpleRNNCell is doing. When writing by yourself, it should be a shortcut to imitate the existing process first.

(Also, I think you can refer to the example of tf.keras.layers.RNN | TensorFlow Core v2.1.0)

The source code for SimpleRNNCell can be found below. tensorflow/recurrent.py at v2.1.0 · tensorflow/tensorflow · GitHub

Let's take a look at the contents by excerpting from this.

First, inherit Layer in the class definition. DropoutRNNCellMixin seems to be inherited to support Dropout, but it's out of the question so I won't touch it here.

recurrent.py


class SimpleRNNCell(DropoutRNNCellMixin, Layer):

In build (), the weight required for the layer is defined by ```add_weight. Not only RNN but also Dense () etc. do the same thing. Corresponding to equation (1), kernelis $ U $, recurrent_kernelis $ W $, and bias`` is $ b $.

Then use call () to define the actual processing. This is the most important.

recurrent.py


  def call(self, inputs, states, training=None):
    prev_output = states[0]
    dp_mask = self.get_dropout_mask_for_cell(inputs, training)
    rec_dp_mask = self.get_recurrent_dropout_mask_for_cell(
        prev_output, training)

    if dp_mask is not None:
      h = K.dot(inputs * dp_mask, self.kernel)
    else:
      h = K.dot(inputs, self.kernel)
    if self.bias is not None:
      h = K.bias_add(h, self.bias)

    if rec_dp_mask is not None:
      prev_output = prev_output * rec_dp_mask
    output = h + K.dot(prev_output, self.recurrent_kernel)
    if self.activation is not None:
      output = self.activation(output)

    return output, [output]

Input $ x_t $ is entered in ʻinputs``, and state $ s_ {t-1} $ (generated at the previous time) is entered in `states. Statesis passed as a list of each variable so that it can have multiple states. So, first we are extracting only states [0] ``. Let's leave the Dropout related processing aside, and try to extract the main part

h = K.dot(inputs, self.kernel)
if self.bias is not None:
  h = K.bias_add(h, self.bias)
output = h + K.dot(prev_output, self.recurrent_kernel)
if self.activation is not None:
  output = self.activation(output)
return output, [output]

That's it. In TensorFlow, each input / output sample is represented by a row vector, so the order of matrix multiplication is reversed, but you can see that it can correspond to equation (1). The final return returns the layer output $ o_t $ and the state $ s_t $ you want to bring to the next time. The state passed here can be received by call () at the next time. The state is returned as a list as well as the arguments. If you return multiple states here, you will receive multiple states at the next time.

Let's take a look at LSTM

Next, let's look at the LSTM layer as a slightly more complicated example. First, try using LSTM instead of SimpleRNN with the same problem settings as before.

lstm.py


import tensorflow as tf 
import numpy as np 
from tensorflow.keras import Sequential 
from tensorflow.keras.layers import LSTM
from tensorflow.keras.optimizers import SGD

tf.random.set_seed(111)
np.random.seed(111)

model = Sequential([
    LSTM(1, activation=None, input_shape=(None, 1), return_sequences=True)
])
model.compile(optimizer=SGD(lr=0.0001), loss="mean_squared_error")

n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1)

model.fit(x, y, batch_size=512, epochs=100)

model.layers[0].weights                                                   
# [<tf.Variable 'lstm/kernel:0' shape=(1, 4) dtype=float32, numpy=
#  array([[ 0.11471224, -0.15296884,  0.82662594, -0.14256166]],
#        dtype=float32)>,
#  <tf.Variable 'lstm/recurrent_kernel:0' shape=(1, 4) dtype=float32, numpy=
#  array([[ 0.10575113,  0.16468772, -0.05777477,  0.20210776]],
#        dtype=float32)>,
#  <tf.Variable 'lstm/bias:0' shape=(4,) dtype=float32, numpy=array([0.4812489, 1.6566612, 1.1815464, 0.4349145], dtype=float32)>]

model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 0.59412843,  1.1486205 ,  1.6723596 ,  2.1724625 ,  2.6546886 ,
#         3.1237347 ,  3.5834525 ,  4.0370073 ,  4.486994  ,  4.93552   ,
#         5.38427   ,  5.8345466 ,  6.2873073 ,  6.7431927 ,  7.20255   ,
#         7.6654577 ,  8.131752  ,  8.601054  ,  9.072805  ,  9.546291  ,
#        10.0206785 , 10.495057  , 10.968457  , 11.439891  , 11.908364  ,
#        12.372919  , 12.832628  , 13.286626  , 13.734106  , 14.174344  ],
#       dtype=float32)

Actually, if you just use it, I have tried using it in the previous article (the problem setting is the same ...). Try return_sequences = True on Keras RNN (LSTM)-Qiita This time, let's dig a little deeper into the implementation part.

Similar to SimpleRNN, this can also be separated into cells from RNN to achieve equivalent processing [^ 2].

[^ 2]: Actually, using LSTM allows a fast CuDNN implementation (in some cases), so if you just want to use LSTM, there is no point in separating it.

from tensorflow.keras.layers import LSTM, RNN, LSTMCell

model = Sequential([
    # LSTM(1, activation=None, input_shape=(None, 1), return_sequences=True)
    RNN(LSTMCell(1, activation=None), input_shape=(None, 1), return_sequences=True)
])

After this, we will focus on the processing of the cell part LSTMCell.

LSTM formula

Before looking at the implementation, let's first check the LSTM processing. The theoretical meaning of each gate is not mentioned here. (Formulas and figures are quoted from [^ basiclstm])

20170506172239.png

\begin{align}
o_t &= σ \left( W_ox_t + R_oh_{t-1} + b_o \right) \tag{2.1}\\
f_t &= σ \left( W_fx_t + R_fh_{t-1} + b_f \right) \tag{2.2}\\
i_t &= σ \left( W_ix_t + R_ih_{t-1} + b_i \right) \tag{2.3}\\
z_t &= \tanh \left( W_zx_t + R_zh_{t-1} + b_z \right) \tag{2.4}\\
c_t &= i_t \otimes z_t+c_{t-1} \otimes f_t  \tag{2.5}\\
h_t &= o_t \otimes \tanh(c_t) \tag{2.6}
\end{align}

However, $ \ otimes $ is the product of each element, $ \ sigma $ is the sigmoid function, and $ \ tanh $ is the hyperbolic tangent.

Decoding LSTMCell

Let's look at the implementation of LSTMCell based on equations (2.1) to (2.6). tensorflow/recurrent.py at v2.1.0 · tensorflow/tensorflow · GitHub

Exporting a class is the same as SimpleRNN.

recurrent.py


class LSTMCell(DropoutRNNCellMixin, Layer):

The weight is defined by build ().

recurrent.py


  def build(self, input_shape):
    default_caching_device = _caching_device(self)
    input_dim = input_shape[-1]
    self.kernel = self.add_weight(
        shape=(input_dim, self.units * 4),
        name='kernel',
        initializer=self.kernel_initializer,
        regularizer=self.kernel_regularizer,
        constraint=self.kernel_constraint,
        caching_device=default_caching_device)
    self.recurrent_kernel = self.add_weight(
        shape=(self.units, self.units * 4),
        name='recurrent_kernel',
        initializer=self.recurrent_initializer,
        regularizer=self.recurrent_regularizer,
        constraint=self.recurrent_constraint,
        caching_device=default_caching_device)

    if self.use_bias:
      if self.unit_forget_bias:

        def bias_initializer(_, *args, **kwargs):
          return K.concatenate([
              self.bias_initializer((self.units,), *args, **kwargs),
              initializers.Ones()((self.units,), *args, **kwargs),
              self.bias_initializer((self.units * 2,), *args, **kwargs),
          ])
      else:
        bias_initializer = self.bias_initializer
      self.bias = self.add_weight(
          shape=(self.units * 4,),
          name='bias',
          initializer=bias_initializer,
          regularizer=self.bias_regularizer,
          constraint=self.bias_constraint,
          caching_device=default_caching_device)
    else:
      self.bias = None
    self.built = True

Aside from the details here, please note that there are several descriptions of self.units * 4. In fact, kernel contains a concatenation of four matrices $ W_o, W_f, W_i, W_z $ [^ 4]. Similarly, recurrent_kernel has four of $ R_o, R_f, R_i, R_z $ together, and bias has four of b_o, b_f, b_i, b_z. I have them all together. Of course, it's not a mistake to keep each of them in 4 variables (12 in total). As usual, the rows and columns are reversed compared to writing in a formula, so looking at the formula, it seems that the number of rows will be quadrupled, but in the code, the number of columns is quadrupled.

[^ 4]: In addition to the effect of reducing the number of variables, there is an advantage that the contents of the activation functions of equations (2.1) to (2.4) can be calculated collectively. If you give ```implementation = 2when creating LSTMCell () ``, it seems that the implementation that calculates all at once is used.

call () is the main body part. For the sake of simplicity, only the processing for `ʻimplementation = 1`` is shown.

recurrent.py


  def call(self, inputs, states, training=None):
    h_tm1 = states[0]  # previous memory state
    c_tm1 = states[1]  # previous carry state
    (Abbreviation)
      if 0 < self.dropout < 1.:
        (Abbreviation)
      else:
        inputs_i = inputs
        inputs_f = inputs
        inputs_c = inputs
        inputs_o = inputs
      k_i, k_f, k_c, k_o = array_ops.split(
          self.kernel, num_or_size_splits=4, axis=1)
      x_i = K.dot(inputs_i, k_i)
      x_f = K.dot(inputs_f, k_f)
      x_c = K.dot(inputs_c, k_c)
      x_o = K.dot(inputs_o, k_o)
      if self.use_bias:
        b_i, b_f, b_c, b_o = array_ops.split(
            self.bias, num_or_size_splits=4, axis=0)
        x_i = K.bias_add(x_i, b_i)
        x_f = K.bias_add(x_f, b_f)
        x_c = K.bias_add(x_c, b_c)
        x_o = K.bias_add(x_o, b_o)

      if 0 < self.recurrent_dropout < 1.:
        (Abbreviation)
      else:
        h_tm1_i = h_tm1
        h_tm1_f = h_tm1
        h_tm1_c = h_tm1
        h_tm1_o = h_tm1
      x = (x_i, x_f, x_c, x_o)
      h_tm1 = (h_tm1_i, h_tm1_f, h_tm1_c, h_tm1_o)
      c, o = self._compute_carry_and_output(x, h_tm1, c_tm1)
    (Abbreviation)
    h = o * self.activation(c)
    return h, [h, c]

In equations (2.1) to (2.6), $ h_ {t-1} and c_ {t-1} $ are used as the information of the previous time **. Therefore, both of them must have as a state. ** As mentioned above, you can handle more than one state by passing the states in a list.

In the first half, we calculate four values: $ W_ox_t + b_o, W_fx_t + b_f, W_ix_t + b_i, W_zx_t + b_z $. (Note that $ W_z, b_z $ in equation (2.4) are subscripted differently from k_c, b_c in the code) In the second half, we use _compute_carry_and_output () to calculate the values for $ c_t and o_t $. Finally, calculate $ h_t $. Here, $ h_t $ is output as it is, and $ h_t and c_t $ are returned as states for use in the calculation at the next time. The default value for ```activationis tanh``, as in equation (2.6).

_compute_carry_and_output () is defined as follows.

recurrent.py


  def _compute_carry_and_output(self, x, h_tm1, c_tm1):
    """Computes carry and output using split kernels."""
    x_i, x_f, x_c, x_o = x
    h_tm1_i, h_tm1_f, h_tm1_c, h_tm1_o = h_tm1
    i = self.recurrent_activation(
        x_i + K.dot(h_tm1_i, self.recurrent_kernel[:, :self.units]))
    f = self.recurrent_activation(x_f + K.dot(
        h_tm1_f, self.recurrent_kernel[:, self.units:self.units * 2]))
    c = f * c_tm1 + i * self.activation(x_c + K.dot(
        h_tm1_c, self.recurrent_kernel[:, self.units * 2:self.units * 3]))
    o = self.recurrent_activation(
        x_o + K.dot(h_tm1_o, self.recurrent_kernel[:, self.units * 3:]))
    return c, o

In each case, the matrix product is calculated using only a part of recurrent_kernel. The matrix product part is basically $ R_ih_ {t-1}, R_fh_ {t-1}, R_zh_ {t-1}, R_oh_ {t-1} $. x_i, x_f, x_c, x_o will contain the calculated values of $ W_ix_t + b_i, W_fx_t + b_f, W_zx_t + b_z, W_ox_t + b_o $ Now you can calculate the contents of the activation function. The activation function recurrent_activation corresponds to the sigmoid function of equations (2.1) to (2.3), but it seems that the default value is hard_sigmoid [^ 5]. The rest is as defined in the formula.

[^ 5]: Keras's hard_sigmoid is max (0, min (1, (0.2 * x) + 0.5))-Qiita

Try to assemble the RNN yourself

Based on the contents so far, I would like to implement and try the derivative form of LSTM proposed in the paper etc. by myself! I will introduce an example of the flow when I thought.

I'd like a simple example, so I'll try the Simplified LSTM (S-LSTM) proposed in Wu (2016) [^ 6].

First, I will quote the formula of the original paper. However, in order to match the notation with other expressions on this page, the position of the subscript has been changed, and the generalized version of $ \ delta, g $ has been changed to $ \ sigma, \ tanh $. I am doing.

\begin{align}
f_t &=\sigma(W_fx_t+R_fh_{t−1}+b_f) \\
c_t &=f_t \otimes c_{t−1}+ (1−f_t) \otimes \tanh (W_c x_t+R_ch_{t−1}+b_c) \\
h_t &=\tanh (c_t)
\end{align}

Which state is it?

First, let's find what you should have as a state from the formula. You must have a variable as a state that uses information from the previous time, that is, references the ** subscript in $ t-1 $. Therefore, this time we have $ h_t and c_t $ as states. ** **

Which is the weight?

The weights (parameters you want to learn) are $ W_f, R_f, b_f, W_c, R_c, b_c $. Compared to ordinary LSTM, the number of weights is halved.

Implementation

LSTMCell inherits Layer, but when you make it yourself, it seems better to inherit tf.keras.layers.AbstractRNNCell. tf.keras.layers.AbstractRNNCell | TensorFlow Core v2.1.0

This is the base class for implementing RNN cells with custom behavior.

For build (), would it look like this if modified based on the LSTM implementation? * 4 is changed to * 2 to exclude parts that are not directly related to Dropout.

  def build(self, input_shape):
    input_dim = input_shape[-1]
    self.kernel = self.add_weight(
        shape=(input_dim, self.units * 2),
        name='kernel',
        initializer=self.kernel_initializer,
        regularizer=self.kernel_regularizer,
        constraint=self.kernel_constraint)
    self.recurrent_kernel = self.add_weight(
        shape=(self.units, self.units * 2),
        name='recurrent_kernel',
        initializer=self.recurrent_initializer,
        regularizer=self.recurrent_regularizer,
        constraint=self.recurrent_constraint)

    if self.use_bias:
      self.bias = self.add_weight(
          shape=(self.units * 2,),
          name='bias',
          initializer=self.bias_initializer,
          regularizer=self.bias_regularizer,
          constraint=self.bias_constraint)
    else:
      self.bias = None
    self.built = True

For call (), implement only the processing equivalent to implementation = 1. Maybe something like this. Note that inputs and states are tf.Tensor, so do not use processing for ndarray such as np.dot. , tf.math, tf.linalg, tf.keras.backend, etc. Please process using the function that handles Tensor. I want to get used to the Tensor object of TensorFlow --Qiita

  def call(self, inputs, states, training=None):
    h_tm1 = states[0]  # previous memory state
    c_tm1 = states[1]  # previous carry state

    k_f, k_c = array_ops.split(
          self.kernel, num_or_size_splits=2, axis=1)
    x_f = K.dot(inputs, k_f)
    x_c = K.dot(inputs, k_c)
    if self.use_bias:
      b_f, b_c = array_ops.split(
          self.bias, num_or_size_splits=2, axis=0)
      x_f = K.bias_add(x_f, b_f)
      x_c = K.bias_add(x_c, b_c)

    f = self.recurrent_activation(x_f + K.dot(
        h_tm1, self.recurrent_kernel[:, :self.units]))
    c = f * c_tm1 + (1 - f) * self.activation(x_c + K.dot(
        h_tm1, self.recurrent_kernel[:, self.units:]))

    h = self.activation(c)
    return h, [h, c]

Whole code

slstm.py


import tensorflow as tf 
import numpy as np 
from tensorflow.keras import Sequential 
from tensorflow.keras.layers import RNN, AbstractRNNCell
from tensorflow.keras.optimizers import SGD
from tensorflow.python.keras import activations, constraints, initializers, regularizers
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.utils import tf_utils
from tensorflow.python.ops import array_ops

class SLSTMCell(AbstractRNNCell):
  def __init__(self,
               units,
               activation='tanh',
               recurrent_activation='hard_sigmoid',
               use_bias=True,
               kernel_initializer='glorot_uniform',
               recurrent_initializer='orthogonal',
               bias_initializer='zeros',
               kernel_regularizer=None,
               recurrent_regularizer=None,
               bias_regularizer=None,
               kernel_constraint=None,
               recurrent_constraint=None,
               bias_constraint=None,
               **kwargs):

    super(SLSTMCell, self).__init__(**kwargs)
    self.units = units
    self.activation = activations.get(activation)
    self.recurrent_activation = activations.get(recurrent_activation)
    self.use_bias = use_bias

    self.kernel_initializer = initializers.get(kernel_initializer)
    self.recurrent_initializer = initializers.get(recurrent_initializer)
    self.bias_initializer = initializers.get(bias_initializer)

    self.kernel_regularizer = regularizers.get(kernel_regularizer)
    self.recurrent_regularizer = regularizers.get(recurrent_regularizer)
    self.bias_regularizer = regularizers.get(bias_regularizer)

    self.kernel_constraint = constraints.get(kernel_constraint)
    self.recurrent_constraint = constraints.get(recurrent_constraint)
    self.bias_constraint = constraints.get(bias_constraint)

  @property
  def state_size(self):
    return [self.units, self.units]

  def build(self, input_shape):
    input_dim = input_shape[-1]
    self.kernel = self.add_weight(
        shape=(input_dim, self.units * 2),
        name='kernel',
        initializer=self.kernel_initializer,
        regularizer=self.kernel_regularizer,
        constraint=self.kernel_constraint)
    self.recurrent_kernel = self.add_weight(
        shape=(self.units, self.units * 2),
        name='recurrent_kernel',
        initializer=self.recurrent_initializer,
        regularizer=self.recurrent_regularizer,
        constraint=self.recurrent_constraint)

    if self.use_bias:
      self.bias = self.add_weight(
          shape=(self.units * 2,),
          name='bias',
          initializer=self.bias_initializer,
          regularizer=self.bias_regularizer,
          constraint=self.bias_constraint)
    else:
      self.bias = None
    self.built = True

  def call(self, inputs, states, training=None):
    h_tm1 = states[0]  # previous memory state
    c_tm1 = states[1]  # previous carry state

    k_f, k_c = array_ops.split(
          self.kernel, num_or_size_splits=2, axis=1)
    x_f = K.dot(inputs, k_f)
    x_c = K.dot(inputs, k_c)
    if self.use_bias:
      b_f, b_c = array_ops.split(
          self.bias, num_or_size_splits=2, axis=0)
      x_f = K.bias_add(x_f, b_f)
      x_c = K.bias_add(x_c, b_c)

    f = self.recurrent_activation(x_f + K.dot(
        h_tm1, self.recurrent_kernel[:, :self.units]))
    c = f * c_tm1 + (1 - f) * self.activation(x_c + K.dot(
        h_tm1, self.recurrent_kernel[:, self.units:]))

    h = self.activation(c)
    return h, [h, c]

tf.random.set_seed(111)
np.random.seed(111)

model = Sequential([
    RNN(SLSTMCell(1, activation=None), input_shape=(None, 1), return_sequences=True)
])
model.compile(optimizer=SGD(lr=0.0001), loss="mean_squared_error")

n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1)

model.fit(x, y, batch_size=512, epochs=100)

model.layers[0].weights
# [<tf.Variable 'rnn/kernel:0' shape=(1, 2) dtype=float32, numpy=array([[-0.79614836,  0.03041089]], dtype=float32)>,
#  <tf.Variable 'rnn/recurrent_kernel:0' shape=(1, 2) dtype=float32, numpy=array([[0.08143749, 1.0668359 ]], dtype=float32)>,
#  <tf.Variable 'rnn/bias:0' shape=(2,) dtype=float32, numpy=array([0.6330045, 1.0431471], dtype=float32)>]

model.predict(np.ones((1, 30, 1)) * 0.5).flatten()
# array([ 0.47944844,  0.96489847,  1.4559155 ,  1.9520411 ,  2.4527955 ,
#         2.9576783 ,  3.466171  ,  3.9777386 ,  4.4918313 ,  5.007888  ,
#         5.5253367 ,  6.0435996 ,  6.5620937 ,  7.0802336 ,  7.597435  ,
#         8.113117  ,  8.626705  ,  9.13763   ,  9.645338  , 10.149284  ,
#        10.648943  , 11.143805  , 11.633378  , 12.117197  , 12.594816  ,
#        13.065814  , 13.529797  , 13.986397  , 14.435274  , 14.876117  ],
#       dtype=float32)

I'm not sure if the implementation is correct, but it works like that, so let's say OK.

What is the initial state?

Actually, the state (initial state) at the beginning of the input is $ h_0 = c_0 = 0 $. It is defined in `ʻAbstractRNNCell``.

recurrent.py


  def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
    return _generate_zero_filled_state_for_cell(self, inputs, batch_size, dtype)

That's fine for this issue, but if you want the initial state to be different, you can change it by overriding get_initial_state () in the inheritance destination. For example, if you want to start with $ h_0 = 1 $, it will be as follows.

  def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
    h_0 = tf.ones([batch_size, self.units], dtype)
    c_0 = tf.zeros([batch_size, self.units], dtype)
    return [h_0, c_0]

Try changing the label of the training data so that it is "partial sum +1".

n = 51200
x = np.random.random((n, 30, 1))
y = x.cumsum(axis=1) + 1

model.fit(x, y, batch_size=512, epochs=100)

model.predict(np.ones((1, 30, 1)) * 0.5).flatten()                      
# array([ 1.0134857,  1.5777774,  2.140834 ,  2.702304 ,  3.2618384,
#         3.8190937,  4.3737316,  4.92542  ,  5.473833 ,  6.018653 ,
#         6.5595713,  7.0962873,  7.62851  ,  8.15596  ,  8.678368 ,
#         9.195474 ,  9.707033 , 10.2128105, 10.712584 , 11.206142 ,
#        11.693292 , 12.173846 , 12.647637 , 13.114506 , 13.574308 ,
#        14.026913 , 14.472202 , 14.91007  , 15.3404255, 15.763187 ],
#       dtype=float32)

There seems to be a lot of error, but somehow I got the result.

Summary

I wrote how to use RNN in TensorFlow + Keras and how to customize RNN for re-examination of papers. It's not difficult if you just use RNN and LSTM as a black box, but when you try to understand the internal processing, the reference materials (especially in Japanese) are surprising. I hope this article lowers the hurdles for handling RNNs and LSTMs.

Recommended Posts

[TensorFlow / Keras] The road to assembling an RNN with your favorite structure
The road to compiling to Python 3 with Thrift
The road to updating Splunkbase with your own Splunk app for Python v2 / v3
While maintaining the model structure of image classification (mnist), attach an autoencoder to improve the accuracy of end to end. [tensorflow, keras, mnist, autoencder]
Identify the name from the flower image with keras (tensorflow)
[Neo4J] ④ Try to handle the graph structure with Cypher
I tried to implement Grad-CAM with keras and tensorflow
I tried to find an alternating series with tensorflow
The road to Pythonista
The road to Djangoist
2020/02 Python 3.7 + TensorFlow 2.1 + Keras 2.3.1 + YOLOv3 Object detection with the latest version
[TensorFlow 2 / Keras] How to run learning with CTC Loss in Keras
I tried to find the average of the sequence with TensorFlow
Encourage your company to join the Advent Calendar with Errbot
How to manipulate the DOM in an iframe with Selenium
The road to download Matplotlib
Try TensorFlow MNIST with RNN
How to know the internal structure of an object in Python
Create an alias for Route53 to CloudFront with the AWS API
[Python] Explains how to use the format function with an example