[PYTHON] Run the interaction model with Attention Seq2 Seq

at first

[This site](http://www.ie110704.net/2017/08/21/attention-seq2seq%E3%81%A7%E5] for personal self-study when creating a chatbot in a seminar % AF% BE% E8% A9% B1% E3% 83% A2% E3% 83% 87% E3% 83% AB% E3% 82% 92% E5% AE% 9F% E8% A3% 85% E3% 81 % 97% E3% 81% A6% E3% 81% BF% E3% 81% 9F /) was referred to.

What you want to do here

--I want to pass data in a file. --I want to list the data passed in the file. --I want to confirm learning by selecting from the list of data passed in the file.

code

taiwa_model_file.py


import datetime
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
import MeCab
import numpy as np
import re

#Get data from a file
with open("test_data.txt",encoding="utf-8") as f:
    s = f.read()
f.close()

#List data
l = [x.strip() for x in re.split('\t|\n',s)]
l.pop(-1)

data = np.array(l).reshape(-1, 2,1).tolist()

print("-----------------------------")
print(data)
print("-----------------------------")


gpu = -1
if gpu >= 0: #numpy or cuda.cupy
    xp = chainer.cuda.cupy
    chainer.cuda.get_device(gpu).use()
else:
    xp = np

#Data conversion class definition

class DataConverter:

    def __init__(self, batch_col_size):
        """Class initialization

        Args:
            batch_col_size:Mini-batch word number size during learning
        """
        self.mecab = MeCab.Tagger() #Morphological analyzer
        self.vocab = {"<eos>":0, "<unknown>": 1} #Word dictionary
        self.batch_col_size = batch_col_size

    def load(self, data):
        """At the time of training, read the teacher data and convert it to a Numpy array corresponding to the mini-batch size.

        Args:
            data:Dialogue data
        """
        #Registration of word dictionary
        self.vocab = {"<eos>":0, "<unknown>": 1} #Initialize word dictionary
        for d in data:
            sentences = [d[0][0], d[1][0]] #Input sentence, reply sentence
            for sentence in sentences:
                sentence_words = self.sentence2words(sentence) #Break down sentences into words
                for word in sentence_words:
                    if word not in self.vocab:
                        self.vocab[word] = len(self.vocab)
        #ID conversion and organization of teacher data
        queries, responses = [], []
        for d in data:
            query, response = d[0][0], d[1][0] #Encode statement, decode statement
            queries.append(self.sentence2ids(sentence=query, train=True, sentence_type="query"))
            responses.append(self.sentence2ids(sentence=response, train=True, sentence_type="response"))
        self.train_queries = xp.vstack(queries)
        self.train_responses = xp.vstack(responses)

    def sentence2words(self, sentence):
        """Return the sentence as an array of words

        Args:
            sentence:Sentence string
        """
        sentence_words = []
        for m in self.mecab.parse(sentence).split("\n"): #Decompose into words by morphological analysis
            w = m.split("\t")[0].lower() #word
            if len(w) == 0 or w == "eos": #Illegal characters, EOS omitted
                continue
            sentence_words.append(w)
        sentence_words.append("<eos>") #Finally registered in vocal<eos>Substitute
        return sentence_words

    def sentence2ids(self, sentence, train=True, sentence_type="query"):
        """Convert the sentence to a Numpy array of word IDs and return it

        Args:
            sentence:Sentence string
            train:Whether for learning
            sentence_type:To change the size compensation direction for learning and for mini-batch support with query response"query"or"response"Specify
        Returns:
            ids:Numpy array of word IDs
        """
        ids = [] #Array to convert to word ID and store
        sentence_words = self.sentence2words(sentence) #Break down sentences into words
        for word in sentence_words:
            if word in self.vocab: #If the word exists in the word dictionary, convert it to ID
                ids.append(self.vocab[word])
            else: #If the word does not exist in the word dictionary<unknown>Convert to
                ids.append(self.vocab["<unknown>"])
        #At the time of learning, adjust the word number size and convert to Numpy for mini-batch support
        if train:
            if sentence_type == "query": #In the case of a query, until the mini-batch word number size is reached forward-Compensate for 1
                while len(ids) > self.batch_col_size: #If it is larger than the mini-batch word size, cut it from the beginning until it reaches the mini-batch word size.
                    ids.pop(0)
                ids = xp.array([-1]*(self.batch_col_size-len(ids))+ids, dtype="int32")
            elif sentence_type == "response": #In the case of response, until it reaches the size of the number of mini-batch words backward-Compensate for 1
                while len(ids) > self.batch_col_size: #If it is larger than the mini-batch word size, cut it from the end until it reaches the mini-batch word size.
                    ids.pop()
                ids = xp.array(ids+[-1]*(self.batch_col_size-len(ids)), dtype="int32")
        else: #At the time of prediction, convert to Numpy as it is
            ids = xp.array([ids], dtype="int32")
        return ids

    def ids2words(self, ids):
        """At the time of prediction, convert the Numpy array of word IDs to words and return

        Args:
            ids:Numpy array of word IDs
        Returns:
            words:Array of words
        """
        words = [] #Array to store words
        for i in ids: #Refer to the word ID from the word dictionary in order and convert it to a word
            words.append(list(self.vocab.keys())[list(self.vocab.values()).index(i)])
        return words
#Model class definition

#LSTM encoder class
class LSTMEncoder(chainer.Chain):

    def __init__(self, vocab_size, embed_size, hidden_size):
        """Encoder instantiation

        Args:
            vocab_size:Number of word types used
            embed_size:The size of a word as a vector representation
            hidden_size:Hidden layer size
        """
        super(LSTMEncoder, self).__init__(
            xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
            eh = L.Linear(embed_size, 4 * hidden_size),
            hh = L.Linear(hidden_size, 4 * hidden_size)
        )

    def __call__(self, x, c, h):
        """Encoder calculation

        Args:
            x: one-hot word
            c:Internal memory
            h:Hidden layer
        Returns:
Next internal memory, next hidden layer
        """
        e = F.tanh(self.xe(x))
        return F.lstm(c, self.eh(e) + self.hh(h))

# Attention Model +LSTM decoder class
class AttLSTMDecoder(chainer.Chain):
    def __init__(self, vocab_size, embed_size, hidden_size):
        """Decoder instantiation for Attention Model

        Args:
            vocab_size:Vocabulary number
            embed_size:Word vector size
            hidden_size:Hidden layer size
        """
        super(AttLSTMDecoder, self).__init__(
            ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1), #Layer to convert words into word vectors
            eh = L.Linear(embed_size, 4 * hidden_size), #A layer that transforms a word vector into a vector four times the size of the hidden layer
            hh = L.Linear(hidden_size, 4 * hidden_size), #A layer that transforms the Decoder's intermediate vector into a vector four times the size of the hidden layer
            fh = L.Linear(hidden_size, 4 * hidden_size), #A layer that transforms the weighted average of the forward Encoder's intermediate vector into a vector four times the size of the hidden layer
            bh = L.Linear(hidden_size, 4 * hidden_size), #A layer that transforms the weighted average of the forward Encoder's intermediate vector into a vector four times the size of the hidden layer
            he = L.Linear(hidden_size, embed_size), #Layer that converts a hidden layer size vector to the size of a word vector
            ey = L.Linear(embed_size, vocab_size) #Layer to convert word vector to vocabulary size vector
        )

    def __call__(self, y, c, h, f, b):
        """Decoder calculation

        Args:
            y:Words to enter in Decoder
            c:Internal memory
            h:Decoder intermediate vector
            f:Weighted average of forward encoder calculated by Attention Model
            b:Weighted average of reverse encoder calculated by Attention Model
        Returns:
Vocabulary size vector, updated internal memory, updated intermediate vector
        """
        e = F.tanh(self.ye(y)) #Convert words to word vectors
        c, h = F.lstm(c, self.eh(e) + self.hh(h) + self.fh(f) + self.bh(b)) #LSTM using word vector, Decoder intermediate vector, forward Encoder Attention, reverse Encoder Attention
        t = self.ey(F.tanh(self.he(h))) #Convert the intermediate vector output from the LSTM to a vocabulary size vector
        return t, c, h

#Attention model class
class Attention(chainer.Chain):
    def __init__(self, hidden_size):
        """Attention instantiation
        Args:
            hidden_size:Hidden layer size
        """
        super(Attention, self).__init__(
            fh = L.Linear(hidden_size, hidden_size), #A linear combination layer that transforms a forward Encoder intermediate vector into a hidden layer size vector
            bh = L.Linear(hidden_size, hidden_size), #A linear combination layer that transforms the reverse Encoder intermediate vector into a hidden layer size vector
            hh = L.Linear(hidden_size, hidden_size), #A linear combination layer that transforms the Decoder's intermediate vector into a hidden layer size vector
            hw = L.Linear(hidden_size, 1), #Linear combination layer for converting a hidden layer size vector to a scalar
        )
        self.hidden_size = hidden_size #Remember the size of the hidden layer

    def __call__(self, fs, bs, h):
        """Attention calculation

        Args:
            fs:List of forward Encoder intermediate vectors
            bs:A list of reverse Encoder intermediate vectors
            h:Intermediate vector output by Decoder
        Returns:
Weighted average of the intermediate vector of the forward Encoder, weighted average of the intermediate vector of the reverse Encoder
        """
        batch_size = h.data.shape[0] #Remember the size of the mini-batch
        ws = [] #Initializing the list to record weights
        sum_w = chainer.Variable(xp.zeros((batch_size, 1), dtype='float32')) #Initialize the value to calculate the total weight
        #Weight calculation using Encoder intermediate vector and Decoder intermediate vector
        for f, b in zip(fs, bs):
            w = F.tanh(self.fh(f)+self.bh(b)+self.hh(h)) #Weight calculation using forward Encoder intermediate vector, reverse Encoder intermediate vector, and Decoder intermediate vector
            w = F.exp(self.hw(w)) #Normalize using the softmax function
            ws.append(w) #Record the calculated weight
            sum_w += w
        #Initialization of output weighted average vector
        att_f = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        att_b = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        for f, b, w in zip(fs, bs, ws):
            w /= sum_w #Normalized so that the sum of the weights is 1.
            #weight*Add the intermediate vector of Encoder to the output vector
            att_f += F.reshape(F.batch_matmul(f, w), (batch_size, self.hidden_size))
            att_b += F.reshape(F.batch_matmul(b, w), (batch_size, self.hidden_size))
        return att_f, att_b

#Attention Sequence to Sequence Model class
class AttSeq2Seq(chainer.Chain):
    def __init__(self, vocab_size, embed_size, hidden_size, batch_col_size):
        """Attention +Instantiation of Seq2Seq

        Args:
            vocab_size:Vocabulary size
            embed_size:Word vector size
            hidden_size:Hidden layer size
        """
        super(AttSeq2Seq, self).__init__(
            f_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Forward Encoder
            b_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Reverse Encoder
            attention = Attention(hidden_size), # Attention Model
            decoder = AttLSTMDecoder(vocab_size, embed_size, hidden_size) # Decoder
        )
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.decode_max_size = batch_col_size #Decoding ends when EOS is output, maximum number of output vocabulary when not output
        #Initialize the list to store the forward Encoder intermediate vector and the reverse Encoder intermediate vector
        self.fs = []
        self.bs = []

    def encode(self, words, batch_size):
        """Encoder calculation

        Args:
            words:A recorded list of words to use for input
            batch_size:Mini batch size
        """
        c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        #Forward Encoder calculation
        for w in words:
            c, h = self.f_encoder(w, c, h)
            self.fs.append(h) #Record the calculated intermediate vector
        #Internal memory, intermediate vector initialization
        c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        #Reverse Encoder calculation
        for w in reversed(words):
            c, h = self.b_encoder(w, c, h)
            self.bs.insert(0, h) #Record the calculated intermediate vector
        #Internal memory, intermediate vector initialization
        self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        self.h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))

    def decode(self, w):
        """Decoder calculation

        Args:
            w:Words to enter with Decoder
        Returns:
Predictive word
        """
        att_f, att_b = self.attention(self.fs, self.bs, self.h)
        t, self.c, self.h = self.decoder(w, self.c, self.h, att_f, att_b)
        return t

    def reset(self):
        """Initialize instance variables
        """
        #Initialization of the list that records the intermediate vector of the Encoder
        self.fs = []
        self.bs = []
        #Gradient initialization
        self.zerograds()

    def __call__(self, enc_words, dec_words=None, train=True):
        """A function that calculates forward propagation

        Args:
            enc_words:A list of spoken words
            dec_words:A list of words in the response sentence
            train:Learning or prediction
        Returns:
Total loss calculated or predicted decoded string
        """
        enc_words = enc_words.T
        if train:
            dec_words = dec_words.T
        batch_size = len(enc_words[0]) #Record batch size
        self.reset() #Reset the gradient stored in the model
        enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] #Change words in utterance list to Variable type
        self.encode(enc_words, batch_size) #Encoding calculation
        t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>To the decoder
        loss = chainer.Variable(xp.zeros((), dtype='float32')) #Loss initialization
        ys = [] #A list of words generated by the decoder
        #Decoder calculation
        if train: #Calculate loss for learning
            for w in dec_words:
                y = self.decode(t) #Decode word by word
                t = chainer.Variable(xp.array(w, dtype='int32')) #Convert correct word to Variable type
                loss += F.softmax_cross_entropy(y, t) #Calculate the loss by comparing the correct word with the predicted word
            return loss
        else: #Generate a decoded string for prediction
            for i in range(self.decode_max_size):
                y = self.decode(t)
                y = xp.argmax(y.data) #Since it is still output with probability, get the predicted word with high probability
                ys.append(y)
                t = chainer.Variable(xp.array([y], dtype='int32'))
                if y == 0: #If EOS is output, decoding is finished.
                    break
            return ys
#Learning

#constant
embed_size = 100
hidden_size = 100
batch_size = 6 #Number of batch sizes for mini-batch learning
batch_col_size = 15
epoch_num = 50 #Number of epochs
N = len(data) #Number of teacher data

#Reading teacher data
data_converter = DataConverter(batch_col_size=batch_col_size) #Data converter
data_converter.load(data) #Teacher data reading
vocab_size = len(data_converter.vocab) #Number of words

#Model declaration
model = AttSeq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size, batch_col_size=batch_col_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))

if gpu >= 0:
    model.to_gpu(gpu)

model.reset()

#Learning

st = datetime.datetime.now()
for epoch in range(epoch_num):

    #Mini batch learning
    perm = np.random.permutation(N) #Get a random integer sequence list
    total_loss = 0

    for i in range(0, N, batch_size):
        enc_words = data_converter.train_queries[perm[i:i+batch_size]]
        dec_words = data_converter.train_responses[perm[i:i+batch_size]]
        model.reset()
        loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
        loss.backward()
        loss.unchain_backward()
        total_loss += loss.data
        opt.update()

    if (epoch+1)%10 == 0:
        ed = datetime.datetime.now()
        print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
        st = datetime.datetime.now()

def predict(model, query):
    enc_query = data_converter.sentence2ids(query, train=False)
    dec_response = model(enc_words=enc_query, train=False)
    response = data_converter.ids2words(dec_response)
    print(query, "=>", response)

#Select from a list of data to confirm learning
predict(model, str(data[0][0]))

data

Separate "upper phrase" and "middle phrase, lower phrase" in tabs The data below is from Haiku Search of Haiku Literature Museum. ) From

test_data.txt


Isuzu River is sure to be a chicken
The first day, I wonder if the feathers are green and the peacock
Year-end salmon spilling salt
Manyoshu New Year's poem Fumo Higikoe
Not long and not short
In order to deal with the new light
I wonder if it's the first day I haven't done yet
Right Left Big mirror first practice
Take off your shoulders and accept the radish
Shoot in the eyes of En no Gyōja
Shinonome no Miya Ruins Masaden First view

Execution result

-----------------------------
[[['With chicken'], ['Isuzu River']], [['First day'], ['Feather green and peacock']], [['Year-end salmon'], ['Ah, salt spills']], [['Manyoshu'], ['New Year's poem']], [['Not long'], ['Not a short love letter']], [['First light'], ['To deal with the new heart']], [['Phrase work yet'], ['I wonder if it's the first day I have nothing to do']], [['Right left'], ['Big mirror first practice']], [['Take off your shoulders'], ['Accept the radish']], [['Three days'], ['En no Gyōja's eyes']], [['Shinomeno'], ['The first view of the main shrine']]]
-----------------------------
epoch:	10	total loss:	53.870222091674805	time:	0:00:12.398713
epoch:	20	total loss:	30.608291625976562	time:	0:00:12.503922
epoch:	30	total loss:	13.965360641479492	time:	0:00:12.470424
epoch:	40	total loss:	6.161568880081177	time:	0:00:12.560850
epoch:	50	total loss:	2.741897940635681	time:	0:00:12.466510
['With chicken'] => ['Ihe', 'If', 'you have to', 'Isuzu River', '<eos>']

Recommended Posts

Run the interaction model with Attention Seq2 Seq
Calibrate the model with PyCaret
Run the app with Flask + Heroku
Validate the learning model with Pylearn2
Seq2Seq (2) ~ Attention Model edition ~ with chainer
Run the IDCF cloud CLI with Docker
Run the original YOLO with Jetson Nano
A model that identifies the guitar with fast.ai
Exposing the DCGAN model for Cifar 10 with keras
Solving the Lorenz 96 model with Julia and Python
Load the TensorFlow model file .pb with readNetFromTensorflow ().
Run with CentOS7 + Apache2.4 + Python3.6 for the time being
Monitor the training model with TensorBord on Jupyter Notebook
Run Python with VBA
Run prepDE.py with python3
Model fitting with lmfit
Run Blender with python
Regression with linear model
Run iperf with python
Implement the mathematical model "SIR model" of infectious diseases with OpenModelica
Run the intellisense of your own python library with VScode.
Enjoy the Gray-Scott model with short code using matrix math
Analyze the topic model of becoming a novelist with GensimPy3