[This site](http://www.ie110704.net/2017/08/21/attention-seq2seq%E3%81%A7%E5] for personal self-study when creating a chatbot in a seminar % AF% BE% E8% A9% B1% E3% 83% A2% E3% 83% 87% E3% 83% AB% E3% 82% 92% E5% AE% 9F% E8% A3% 85% E3% 81 % 97% E3% 81% A6% E3% 81% BF% E3% 81% 9F /) was referred to.
--I want to pass data in a file. --I want to list the data passed in the file. --I want to confirm learning by selecting from the list of data passed in the file.
taiwa_model_file.py
import datetime
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
import MeCab
import numpy as np
import re
#Get data from a file
with open("test_data.txt",encoding="utf-8") as f:
s = f.read()
f.close()
#List data
l = [x.strip() for x in re.split('\t|\n',s)]
l.pop(-1)
data = np.array(l).reshape(-1, 2,1).tolist()
print("-----------------------------")
print(data)
print("-----------------------------")
gpu = -1
if gpu >= 0: #numpy or cuda.cupy
xp = chainer.cuda.cupy
chainer.cuda.get_device(gpu).use()
else:
xp = np
#Data conversion class definition
class DataConverter:
def __init__(self, batch_col_size):
"""Class initialization
Args:
batch_col_size:Mini-batch word number size during learning
"""
self.mecab = MeCab.Tagger() #Morphological analyzer
self.vocab = {"<eos>":0, "<unknown>": 1} #Word dictionary
self.batch_col_size = batch_col_size
def load(self, data):
"""At the time of training, read the teacher data and convert it to a Numpy array corresponding to the mini-batch size.
Args:
data:Dialogue data
"""
#Registration of word dictionary
self.vocab = {"<eos>":0, "<unknown>": 1} #Initialize word dictionary
for d in data:
sentences = [d[0][0], d[1][0]] #Input sentence, reply sentence
for sentence in sentences:
sentence_words = self.sentence2words(sentence) #Break down sentences into words
for word in sentence_words:
if word not in self.vocab:
self.vocab[word] = len(self.vocab)
#ID conversion and organization of teacher data
queries, responses = [], []
for d in data:
query, response = d[0][0], d[1][0] #Encode statement, decode statement
queries.append(self.sentence2ids(sentence=query, train=True, sentence_type="query"))
responses.append(self.sentence2ids(sentence=response, train=True, sentence_type="response"))
self.train_queries = xp.vstack(queries)
self.train_responses = xp.vstack(responses)
def sentence2words(self, sentence):
"""Return the sentence as an array of words
Args:
sentence:Sentence string
"""
sentence_words = []
for m in self.mecab.parse(sentence).split("\n"): #Decompose into words by morphological analysis
w = m.split("\t")[0].lower() #word
if len(w) == 0 or w == "eos": #Illegal characters, EOS omitted
continue
sentence_words.append(w)
sentence_words.append("<eos>") #Finally registered in vocal<eos>Substitute
return sentence_words
def sentence2ids(self, sentence, train=True, sentence_type="query"):
"""Convert the sentence to a Numpy array of word IDs and return it
Args:
sentence:Sentence string
train:Whether for learning
sentence_type:To change the size compensation direction for learning and for mini-batch support with query response"query"or"response"Specify
Returns:
ids:Numpy array of word IDs
"""
ids = [] #Array to convert to word ID and store
sentence_words = self.sentence2words(sentence) #Break down sentences into words
for word in sentence_words:
if word in self.vocab: #If the word exists in the word dictionary, convert it to ID
ids.append(self.vocab[word])
else: #If the word does not exist in the word dictionary<unknown>Convert to
ids.append(self.vocab["<unknown>"])
#At the time of learning, adjust the word number size and convert to Numpy for mini-batch support
if train:
if sentence_type == "query": #In the case of a query, until the mini-batch word number size is reached forward-Compensate for 1
while len(ids) > self.batch_col_size: #If it is larger than the mini-batch word size, cut it from the beginning until it reaches the mini-batch word size.
ids.pop(0)
ids = xp.array([-1]*(self.batch_col_size-len(ids))+ids, dtype="int32")
elif sentence_type == "response": #In the case of response, until it reaches the size of the number of mini-batch words backward-Compensate for 1
while len(ids) > self.batch_col_size: #If it is larger than the mini-batch word size, cut it from the end until it reaches the mini-batch word size.
ids.pop()
ids = xp.array(ids+[-1]*(self.batch_col_size-len(ids)), dtype="int32")
else: #At the time of prediction, convert to Numpy as it is
ids = xp.array([ids], dtype="int32")
return ids
def ids2words(self, ids):
"""At the time of prediction, convert the Numpy array of word IDs to words and return
Args:
ids:Numpy array of word IDs
Returns:
words:Array of words
"""
words = [] #Array to store words
for i in ids: #Refer to the word ID from the word dictionary in order and convert it to a word
words.append(list(self.vocab.keys())[list(self.vocab.values()).index(i)])
return words
#Model class definition
#LSTM encoder class
class LSTMEncoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Encoder instantiation
Args:
vocab_size:Number of word types used
embed_size:The size of a word as a vector representation
hidden_size:Hidden layer size
"""
super(LSTMEncoder, self).__init__(
xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
eh = L.Linear(embed_size, 4 * hidden_size),
hh = L.Linear(hidden_size, 4 * hidden_size)
)
def __call__(self, x, c, h):
"""Encoder calculation
Args:
x: one-hot word
c:Internal memory
h:Hidden layer
Returns:
Next internal memory, next hidden layer
"""
e = F.tanh(self.xe(x))
return F.lstm(c, self.eh(e) + self.hh(h))
# Attention Model +LSTM decoder class
class AttLSTMDecoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Decoder instantiation for Attention Model
Args:
vocab_size:Vocabulary number
embed_size:Word vector size
hidden_size:Hidden layer size
"""
super(AttLSTMDecoder, self).__init__(
ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1), #Layer to convert words into word vectors
eh = L.Linear(embed_size, 4 * hidden_size), #A layer that transforms a word vector into a vector four times the size of the hidden layer
hh = L.Linear(hidden_size, 4 * hidden_size), #A layer that transforms the Decoder's intermediate vector into a vector four times the size of the hidden layer
fh = L.Linear(hidden_size, 4 * hidden_size), #A layer that transforms the weighted average of the forward Encoder's intermediate vector into a vector four times the size of the hidden layer
bh = L.Linear(hidden_size, 4 * hidden_size), #A layer that transforms the weighted average of the forward Encoder's intermediate vector into a vector four times the size of the hidden layer
he = L.Linear(hidden_size, embed_size), #Layer that converts a hidden layer size vector to the size of a word vector
ey = L.Linear(embed_size, vocab_size) #Layer to convert word vector to vocabulary size vector
)
def __call__(self, y, c, h, f, b):
"""Decoder calculation
Args:
y:Words to enter in Decoder
c:Internal memory
h:Decoder intermediate vector
f:Weighted average of forward encoder calculated by Attention Model
b:Weighted average of reverse encoder calculated by Attention Model
Returns:
Vocabulary size vector, updated internal memory, updated intermediate vector
"""
e = F.tanh(self.ye(y)) #Convert words to word vectors
c, h = F.lstm(c, self.eh(e) + self.hh(h) + self.fh(f) + self.bh(b)) #LSTM using word vector, Decoder intermediate vector, forward Encoder Attention, reverse Encoder Attention
t = self.ey(F.tanh(self.he(h))) #Convert the intermediate vector output from the LSTM to a vocabulary size vector
return t, c, h
#Attention model class
class Attention(chainer.Chain):
def __init__(self, hidden_size):
"""Attention instantiation
Args:
hidden_size:Hidden layer size
"""
super(Attention, self).__init__(
fh = L.Linear(hidden_size, hidden_size), #A linear combination layer that transforms a forward Encoder intermediate vector into a hidden layer size vector
bh = L.Linear(hidden_size, hidden_size), #A linear combination layer that transforms the reverse Encoder intermediate vector into a hidden layer size vector
hh = L.Linear(hidden_size, hidden_size), #A linear combination layer that transforms the Decoder's intermediate vector into a hidden layer size vector
hw = L.Linear(hidden_size, 1), #Linear combination layer for converting a hidden layer size vector to a scalar
)
self.hidden_size = hidden_size #Remember the size of the hidden layer
def __call__(self, fs, bs, h):
"""Attention calculation
Args:
fs:List of forward Encoder intermediate vectors
bs:A list of reverse Encoder intermediate vectors
h:Intermediate vector output by Decoder
Returns:
Weighted average of the intermediate vector of the forward Encoder, weighted average of the intermediate vector of the reverse Encoder
"""
batch_size = h.data.shape[0] #Remember the size of the mini-batch
ws = [] #Initializing the list to record weights
sum_w = chainer.Variable(xp.zeros((batch_size, 1), dtype='float32')) #Initialize the value to calculate the total weight
#Weight calculation using Encoder intermediate vector and Decoder intermediate vector
for f, b in zip(fs, bs):
w = F.tanh(self.fh(f)+self.bh(b)+self.hh(h)) #Weight calculation using forward Encoder intermediate vector, reverse Encoder intermediate vector, and Decoder intermediate vector
w = F.exp(self.hw(w)) #Normalize using the softmax function
ws.append(w) #Record the calculated weight
sum_w += w
#Initialization of output weighted average vector
att_f = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
att_b = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
for f, b, w in zip(fs, bs, ws):
w /= sum_w #Normalized so that the sum of the weights is 1.
#weight*Add the intermediate vector of Encoder to the output vector
att_f += F.reshape(F.batch_matmul(f, w), (batch_size, self.hidden_size))
att_b += F.reshape(F.batch_matmul(b, w), (batch_size, self.hidden_size))
return att_f, att_b
#Attention Sequence to Sequence Model class
class AttSeq2Seq(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size, batch_col_size):
"""Attention +Instantiation of Seq2Seq
Args:
vocab_size:Vocabulary size
embed_size:Word vector size
hidden_size:Hidden layer size
"""
super(AttSeq2Seq, self).__init__(
f_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Forward Encoder
b_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Reverse Encoder
attention = Attention(hidden_size), # Attention Model
decoder = AttLSTMDecoder(vocab_size, embed_size, hidden_size) # Decoder
)
self.vocab_size = vocab_size
self.embed_size = embed_size
self.hidden_size = hidden_size
self.decode_max_size = batch_col_size #Decoding ends when EOS is output, maximum number of output vocabulary when not output
#Initialize the list to store the forward Encoder intermediate vector and the reverse Encoder intermediate vector
self.fs = []
self.bs = []
def encode(self, words, batch_size):
"""Encoder calculation
Args:
words:A recorded list of words to use for input
batch_size:Mini batch size
"""
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
#Forward Encoder calculation
for w in words:
c, h = self.f_encoder(w, c, h)
self.fs.append(h) #Record the calculated intermediate vector
#Internal memory, intermediate vector initialization
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
#Reverse Encoder calculation
for w in reversed(words):
c, h = self.b_encoder(w, c, h)
self.bs.insert(0, h) #Record the calculated intermediate vector
#Internal memory, intermediate vector initialization
self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
self.h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
def decode(self, w):
"""Decoder calculation
Args:
w:Words to enter with Decoder
Returns:
Predictive word
"""
att_f, att_b = self.attention(self.fs, self.bs, self.h)
t, self.c, self.h = self.decoder(w, self.c, self.h, att_f, att_b)
return t
def reset(self):
"""Initialize instance variables
"""
#Initialization of the list that records the intermediate vector of the Encoder
self.fs = []
self.bs = []
#Gradient initialization
self.zerograds()
def __call__(self, enc_words, dec_words=None, train=True):
"""A function that calculates forward propagation
Args:
enc_words:A list of spoken words
dec_words:A list of words in the response sentence
train:Learning or prediction
Returns:
Total loss calculated or predicted decoded string
"""
enc_words = enc_words.T
if train:
dec_words = dec_words.T
batch_size = len(enc_words[0]) #Record batch size
self.reset() #Reset the gradient stored in the model
enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] #Change words in utterance list to Variable type
self.encode(enc_words, batch_size) #Encoding calculation
t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>To the decoder
loss = chainer.Variable(xp.zeros((), dtype='float32')) #Loss initialization
ys = [] #A list of words generated by the decoder
#Decoder calculation
if train: #Calculate loss for learning
for w in dec_words:
y = self.decode(t) #Decode word by word
t = chainer.Variable(xp.array(w, dtype='int32')) #Convert correct word to Variable type
loss += F.softmax_cross_entropy(y, t) #Calculate the loss by comparing the correct word with the predicted word
return loss
else: #Generate a decoded string for prediction
for i in range(self.decode_max_size):
y = self.decode(t)
y = xp.argmax(y.data) #Since it is still output with probability, get the predicted word with high probability
ys.append(y)
t = chainer.Variable(xp.array([y], dtype='int32'))
if y == 0: #If EOS is output, decoding is finished.
break
return ys
#Learning
#constant
embed_size = 100
hidden_size = 100
batch_size = 6 #Number of batch sizes for mini-batch learning
batch_col_size = 15
epoch_num = 50 #Number of epochs
N = len(data) #Number of teacher data
#Reading teacher data
data_converter = DataConverter(batch_col_size=batch_col_size) #Data converter
data_converter.load(data) #Teacher data reading
vocab_size = len(data_converter.vocab) #Number of words
#Model declaration
model = AttSeq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size, batch_col_size=batch_col_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))
if gpu >= 0:
model.to_gpu(gpu)
model.reset()
#Learning
st = datetime.datetime.now()
for epoch in range(epoch_num):
#Mini batch learning
perm = np.random.permutation(N) #Get a random integer sequence list
total_loss = 0
for i in range(0, N, batch_size):
enc_words = data_converter.train_queries[perm[i:i+batch_size]]
dec_words = data_converter.train_responses[perm[i:i+batch_size]]
model.reset()
loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
loss.backward()
loss.unchain_backward()
total_loss += loss.data
opt.update()
if (epoch+1)%10 == 0:
ed = datetime.datetime.now()
print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
st = datetime.datetime.now()
def predict(model, query):
enc_query = data_converter.sentence2ids(query, train=False)
dec_response = model(enc_words=enc_query, train=False)
response = data_converter.ids2words(dec_response)
print(query, "=>", response)
#Select from a list of data to confirm learning
predict(model, str(data[0][0]))
Separate "upper phrase" and "middle phrase, lower phrase" in tabs The data below is from Haiku Search of Haiku Literature Museum. ) From
test_data.txt
Isuzu River is sure to be a chicken
The first day, I wonder if the feathers are green and the peacock
Year-end salmon spilling salt
Manyoshu New Year's poem Fumo Higikoe
Not long and not short
In order to deal with the new light
I wonder if it's the first day I haven't done yet
Right Left Big mirror first practice
Take off your shoulders and accept the radish
Shoot in the eyes of En no Gyōja
Shinonome no Miya Ruins Masaden First view
-----------------------------
[[['With chicken'], ['Isuzu River']], [['First day'], ['Feather green and peacock']], [['Year-end salmon'], ['Ah, salt spills']], [['Manyoshu'], ['New Year's poem']], [['Not long'], ['Not a short love letter']], [['First light'], ['To deal with the new heart']], [['Phrase work yet'], ['I wonder if it's the first day I have nothing to do']], [['Right left'], ['Big mirror first practice']], [['Take off your shoulders'], ['Accept the radish']], [['Three days'], ['En no Gyōja's eyes']], [['Shinomeno'], ['The first view of the main shrine']]]
-----------------------------
epoch: 10 total loss: 53.870222091674805 time: 0:00:12.398713
epoch: 20 total loss: 30.608291625976562 time: 0:00:12.503922
epoch: 30 total loss: 13.965360641479492 time: 0:00:12.470424
epoch: 40 total loss: 6.161568880081177 time: 0:00:12.560850
epoch: 50 total loss: 2.741897940635681 time: 0:00:12.466510
['With chicken'] => ['Ihe', 'If', 'you have to', 'Isuzu River', '<eos>']
Recommended Posts