[PYTHON] Attention Seq2 Exécutez le modèle de dialogue avec Seq

en premier

[Ce site](http://www.ie110704.net/2017/08/21/attention-seq2seq%E3%81%A7%E5] pour l'auto-apprentissage personnel lors de la création d'un chatbot dans un séminaire % AF% BE% E8% A9% B1% E3% 83% A2% E3% 83% 87% E3% 83% AB% E3% 82% 92% E5% AE% 9F% E8% A3% 85% E3% 81 % 97% E3% 81% A6% E3% 81% BF% E3% 81% 9F /) a été mentionnée.

Ce que tu veux faire ici

--Je veux transmettre des données dans un fichier. --Je veux lister les données passées dans le fichier. --Je souhaite confirmer l'apprentissage en sélectionnant dans la liste des données passées dans le fichier.

code

taiwa_model_file.py


import datetime
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
import MeCab
import numpy as np
import re

#Récupérer les données d'un fichier
with open("test_data.txt",encoding="utf-8") as f:
    s = f.read()
f.close()

#Données de liste
l = [x.strip() for x in re.split('\t|\n',s)]
l.pop(-1)

data = np.array(l).reshape(-1, 2,1).tolist()

print("-----------------------------")
print(data)
print("-----------------------------")


gpu = -1
if gpu >= 0: #numpy ou cuda.cupy
    xp = chainer.cuda.cupy
    chainer.cuda.get_device(gpu).use()
else:
    xp = np

#Définition de la classe de transformation de données

class DataConverter:

    def __init__(self, batch_col_size):
        """Initialisation de classe

        Args:
            batch_col_size:Taille du nombre de mots du mini-lot pendant l'apprentissage
        """
        self.mecab = MeCab.Tagger() #Analyseur morphologique
        self.vocab = {"<eos>":0, "<unknown>": 1} #Dictionnaire de mots
        self.batch_col_size = batch_col_size

    def load(self, data):
        """Pendant la formation, lisez les données de l'enseignant et convertissez-les en un tableau Numpy correspondant à la taille du mini-lot

        Args:
            data:Données de dialogue
        """
        #Enregistrement du dictionnaire de mots
        self.vocab = {"<eos>":0, "<unknown>": 1} #Initialiser le dictionnaire de mots
        for d in data:
            sentences = [d[0][0], d[1][0]] #Texte d'entrée, texte de réponse
            for sentence in sentences:
                sentence_words = self.sentence2words(sentence) #Décomposer les phrases en mots
                for word in sentence_words:
                    if word not in self.vocab:
                        self.vocab[word] = len(self.vocab)
        #Identification et organisation des données enseignants
        queries, responses = [], []
        for d in data:
            query, response = d[0][0], d[1][0] #Instruction d'encodage, instruction de décodage
            queries.append(self.sentence2ids(sentence=query, train=True, sentence_type="query"))
            responses.append(self.sentence2ids(sentence=response, train=True, sentence_type="response"))
        self.train_queries = xp.vstack(queries)
        self.train_responses = xp.vstack(responses)

    def sentence2words(self, sentence):
        """Renvoie la phrase sous forme de tableau de mots

        Args:
            sentence:Chaîne de phrase
        """
        sentence_words = []
        for m in self.mecab.parse(sentence).split("\n"): #Décomposer en mots par analyse morphologique
            w = m.split("\t")[0].lower() #mot
            if len(w) == 0 or w == "eos": #Caractères illégaux, EOS omis
                continue
            sentence_words.append(w)
        sentence_words.append("<eos>") #Enfin enregistré en vocal<eos>Remplacer
        return sentence_words

    def sentence2ids(self, sentence, train=True, sentence_type="query"):
        """Convertissez des phrases en tableau Numpy d'ID de mot et retournez

        Args:
            sentence:Chaîne de phrase
            train:Que ce soit pour apprendre
            sentence_type:Pour changer la direction de compensation de taille pour l'apprentissage et pour la prise en charge des mini-lots avec réponse à la requête"query"or"response"Spécifier
        Returns:
            ids:Tableau Numpy d'ID de mot
        """
        ids = [] #Un tableau qui est converti en un identifiant de mot et stocké
        sentence_words = self.sentence2words(sentence) #Décomposer les phrases en mots
        for word in sentence_words:
            if word in self.vocab: #Si le mot existe dans le dictionnaire de mots, convertissez-le en ID
                ids.append(self.vocab[word])
            else: #Si le mot n'existe pas dans le dictionnaire de mots<unknown>Convertir en
                ids.append(self.vocab["<unknown>"])
        #Lors de l'apprentissage, ajustez la taille du nombre de mots et convertissez-le en Numpy pour la prise en charge des mini-lots
        if train:
            if sentence_type == "query": #Dans le cas d'une requête, jusqu'à ce que la taille du numéro de mot du mini-lot soit atteinte avant-Compenser pour 1
                while len(ids) > self.batch_col_size: #S'il est plus grand que la taille du mot du mini-lot, coupez-le depuis le début jusqu'à ce qu'il atteigne la taille du mot du mini-lot.
                    ids.pop(0)
                ids = xp.array([-1]*(self.batch_col_size-len(ids))+ids, dtype="int32")
            elif sentence_type == "response": #Dans le cas de réponse, jusqu'à ce qu'il atteigne la taille du nombre de mots du mini-lot vers l'arrière-Compenser pour 1
                while len(ids) > self.batch_col_size: #S'il est plus grand que la taille de mot du mini-lot, coupez-le de la fin jusqu'à ce qu'il atteigne la taille du mot du mini-lot.
                    ids.pop()
                ids = xp.array(ids+[-1]*(self.batch_col_size-len(ids)), dtype="int32")
        else: #Au moment de la prédiction, convertissez en Numpy tel quel
            ids = xp.array([ids], dtype="int32")
        return ids

    def ids2words(self, ids):
        """Au moment de la prédiction, convertissez le tableau Numpy d'ID de mot en mots et renvoyez

        Args:
            ids:Tableau Numpy d'ID de mot
        Returns:
            words:Disposition des mots
        """
        words = [] #Tableau pour stocker les mots
        for i in ids: #Reportez-vous à l'identifiant du mot du dictionnaire de mots dans l'ordre et convertissez-le en mot
            words.append(list(self.vocab.keys())[list(self.vocab.values()).index(i)])
        return words
#Définition de classe de modèle

#Classe d'encodeur LSTM
class LSTMEncoder(chainer.Chain):

    def __init__(self, vocab_size, embed_size, hidden_size):
        """Instanciation du codeur

        Args:
            vocab_size:Nombre de types de mots utilisés
            embed_size:Taille des mots dans la représentation vectorielle
            hidden_size:Taille du calque masqué
        """
        super(LSTMEncoder, self).__init__(
            xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
            eh = L.Linear(embed_size, 4 * hidden_size),
            hh = L.Linear(hidden_size, 4 * hidden_size)
        )

    def __call__(self, x, c, h):
        """Calcul du codeur

        Args:
            x: one-mot chaud
            c:Mémoire interne
            h:Couche cachée
        Returns:
Mémoire interne suivante, couche cachée suivante
        """
        e = F.tanh(self.xe(x))
        return F.lstm(c, self.eh(e) + self.hh(h))

# Attention Model +Classe de décodeur LSTM
class AttLSTMDecoder(chainer.Chain):
    def __init__(self, vocab_size, embed_size, hidden_size):
        """Instanciation du décodeur pour le modèle Attention

        Args:
            vocab_size:Nombre de vocabulaire
            embed_size:Taille du vecteur Word
            hidden_size:Taille du calque masqué
        """
        super(AttLSTMDecoder, self).__init__(
            ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1), #Couche pour convertir des mots en vecteurs de mots
            eh = L.Linear(embed_size, 4 * hidden_size), #Un calque qui transforme un vecteur de mot en un vecteur quatre fois plus grand que le calque masqué
            hh = L.Linear(hidden_size, 4 * hidden_size), #Un calque qui transforme le vecteur intermédiaire du décodeur en un vecteur quatre fois plus grand que le calque caché
            fh = L.Linear(hidden_size, 4 * hidden_size), #Un calque qui transforme la moyenne pondérée du vecteur intermédiaire de l'encodeur avant en un vecteur quatre fois la taille du calque caché
            bh = L.Linear(hidden_size, 4 * hidden_size), #Un calque qui transforme la moyenne pondérée du vecteur intermédiaire de l'encodeur avant en un vecteur quatre fois la taille du calque caché
            he = L.Linear(hidden_size, embed_size), #Un calque qui convertit un vecteur de taille de calque masqué en la taille d'un vecteur de mot
            ey = L.Linear(embed_size, vocab_size) #Couche pour convertir le vecteur de mot en vecteur de taille de vocabulaire
        )

    def __call__(self, y, c, h, f, b):
        """Calcul du décodeur

        Args:
            y:Mots à entrer dans le décodeur
            c:Mémoire interne
            h:Vecteur intermédiaire décodeur
            f:Moyenne pondérée du codeur direct calculé par le modèle Attention
            b:Moyenne pondérée du codeur inversé calculée par Attention Model
        Returns:
Dictionnaire de la taille du vocabulaire, mémoire interne mise à jour, vecteur intermédiaire mis à jour
        """
        e = F.tanh(self.ye(y)) #Convertir des mots en vecteurs de mots
        c, h = F.lstm(c, self.eh(e) + self.hh(h) + self.fh(f) + self.bh(b)) #LSTM avec vecteur de mots, vecteur intermédiaire du décodeur, Attention du codeur avant, Attention du codeur inverse
        t = self.ey(F.tanh(self.he(h))) #Convertir la sortie vectorielle intermédiaire de LSTM en vecteur de taille lexicale
        return t, c, h

#Classe de modèle d'attention
class Attention(chainer.Chain):
    def __init__(self, hidden_size):
        """Instanciation de l'attention
        Args:
            hidden_size:Taille du calque masqué
        """
        super(Attention, self).__init__(
            fh = L.Linear(hidden_size, hidden_size), #Une couche de couplage linéaire qui transforme le vecteur intermédiaire de l'encodeur avant en un vecteur de taille de couche caché
            bh = L.Linear(hidden_size, hidden_size), #Une couche de couplage linéaire qui transforme le vecteur intermédiaire de l'encodeur inverse en un vecteur de taille de couche caché
            hh = L.Linear(hidden_size, hidden_size), #Une couche de couplage linéaire qui transforme le vecteur intermédiaire du décodeur en un vecteur de taille de couche caché
            hw = L.Linear(hidden_size, 1), #Couche de couplage linéaire pour convertir des vecteurs de taille de couche cachés en scalaires
        )
        self.hidden_size = hidden_size #Souvenez-vous de la taille du calque caché

    def __call__(self, fs, bs, h):
        """Calcul de l'attention

        Args:
            fs:Une liste de vecteurs intermédiaires d'encodeur avant
            bs:Liste des vecteurs intermédiaires du codeur inverse
            h:Sortie vectorielle intermédiaire par le décodeur
        Returns:
Moyenne pondérée du vecteur intermédiaire du codeur direct, moyenne pondérée du vecteur intermédiaire du codeur inverse
        """
        batch_size = h.data.shape[0] #Rappelez-vous la taille du mini lot
        ws = [] #Initialisation de la liste pour enregistrer les poids
        sum_w = chainer.Variable(xp.zeros((batch_size, 1), dtype='float32')) #Initialisez la valeur pour calculer la valeur totale du poids
        #Calcul du poids à l'aide du vecteur intermédiaire de l'encodeur et du vecteur intermédiaire du décodeur
        for f, b in zip(fs, bs):
            w = F.tanh(self.fh(f)+self.bh(b)+self.hh(h)) #Calcul du poids à l'aide du vecteur intermédiaire du codeur direct, du vecteur intermédiaire du codeur inverse, du vecteur intermédiaire du décodeur
            w = F.exp(self.hw(w)) #Normaliser à l'aide de la fonction softmax
            ws.append(w) #Enregistrez le poids calculé
            sum_w += w
        #Initialisation du vecteur moyen pondéré en sortie
        att_f = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        att_b = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        for f, b, w in zip(fs, bs, ws):
            w /= sum_w #Normalisé pour que la somme des poids soit 1.
            #poids*Ajouter le vecteur intermédiaire de l'encodeur au vecteur de sortie
            att_f += F.reshape(F.batch_matmul(f, w), (batch_size, self.hidden_size))
            att_b += F.reshape(F.batch_matmul(b, w), (batch_size, self.hidden_size))
        return att_f, att_b

#Séquence d'attention à la classe de modèle de séquence
class AttSeq2Seq(chainer.Chain):
    def __init__(self, vocab_size, embed_size, hidden_size, batch_col_size):
        """Attention +Instanciation Seq2 Seq

        Args:
            vocab_size:Nombre de taille de vocabulaire
            embed_size:Taille du vecteur Word
            hidden_size:Taille du calque masqué
        """
        super(AttSeq2Seq, self).__init__(
            f_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Encodeur avant
            b_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Encodeur inversé
            attention = Attention(hidden_size), # Attention Model
            decoder = AttLSTMDecoder(vocab_size, embed_size, hidden_size) # Decoder
        )
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.decode_max_size = batch_col_size #Le décodage se termine lorsque EOS est sorti, nombre maximum de vocabulaire de sortie lorsqu'il n'est pas sorti
        #Initialisez la liste pour stocker le vecteur intermédiaire de l'encodeur avant et le vecteur intermédiaire de l'encodeur inverse
        self.fs = []
        self.bs = []

    def encode(self, words, batch_size):
        """Calcul du codeur

        Args:
            words:Une liste enregistrée de mots à utiliser dans votre saisie
            batch_size:Mini taille de lot
        """
        c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        #Calcul du codeur avant
        for w in words:
            c, h = self.f_encoder(w, c, h)
            self.fs.append(h) #Enregistrer le vecteur intermédiaire calculé
        #Mémoire interne, initialisation du vecteur intermédiaire
        c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        #Calcul du codeur inversé
        for w in reversed(words):
            c, h = self.b_encoder(w, c, h)
            self.bs.insert(0, h) #Enregistrer le vecteur intermédiaire calculé
        #Mémoire interne, initialisation du vecteur intermédiaire
        self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
        self.h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))

    def decode(self, w):
        """Calcul du décodeur

        Args:
            w:Mots à saisir avec Decoder
        Returns:
Mot prédictif
        """
        att_f, att_b = self.attention(self.fs, self.bs, self.h)
        t, self.c, self.h = self.decoder(w, self.c, self.h, att_f, att_b)
        return t

    def reset(self):
        """Initialiser les variables d'instance
        """
        #Initialisation de la liste qui enregistre le vecteur intermédiaire de l'encodeur
        self.fs = []
        self.bs = []
        #Initialisation du gradient
        self.zerograds()

    def __call__(self, enc_words, dec_words=None, train=True):
        """Une fonction qui calcule la propagation vers l'avant

        Args:
            enc_words:Une liste de mots prononcés
            dec_words:Une liste de mots dans la phrase de réponse
            train:Apprentissage ou prédiction
        Returns:
Perte totale calculée ou chaîne de décodage prévue
        """
        enc_words = enc_words.T
        if train:
            dec_words = dec_words.T
        batch_size = len(enc_words[0]) #Enregistrer la taille du lot
        self.reset() #Réinitialiser le dégradé stocké dans le modèle
        enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] #Changer les mots de la liste d'énoncés en type variable
        self.encode(enc_words, batch_size) #Calcul d'encodage
        t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>Vers le décodeur
        loss = chainer.Variable(xp.zeros((), dtype='float32')) #Initialisation de la perte
        ys = [] #Une liste de mots générés par le décodeur
        #Calcul du décodeur
        if train: #Calculer la perte d'apprentissage
            for w in dec_words:
                y = self.decode(t) #Décoder mot par mot
                t = chainer.Variable(xp.array(w, dtype='int32')) #Convertir le mot correct en type variable
                loss += F.softmax_cross_entropy(y, t) #Calculez la perte en comparant le mot correct avec le mot prédit
            return loss
        else: #Générer une chaîne décodée pour la prédiction
            for i in range(self.decode_max_size):
                y = self.decode(t)
                y = xp.argmax(y.data) #Puisqu'il est toujours produit avec probabilité, obtenez le mot prédit avec une probabilité élevée
                ys.append(y)
                t = chainer.Variable(xp.array([y], dtype='int32'))
                if y == 0: #Lors de la sortie d'EOS, le décodage est terminé.
                    break
            return ys
#Apprentissage

#constant
embed_size = 100
hidden_size = 100
batch_size = 6 #Nombre de tailles de lots pour l'apprentissage par mini-lots
batch_col_size = 15
epoch_num = 50 #Nombre d'époques
N = len(data) #Nombre de données sur les enseignants

#Lire les données des enseignants
data_converter = DataConverter(batch_col_size=batch_col_size) #Convertisseur de données
data_converter.load(data) #Lire les données des enseignants
vocab_size = len(data_converter.vocab) #Nombre de mots

#Modèle de déclaration
model = AttSeq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size, batch_col_size=batch_col_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))

if gpu >= 0:
    model.to_gpu(gpu)

model.reset()

#Apprentissage

st = datetime.datetime.now()
for epoch in range(epoch_num):

    #Mini apprentissage par lots
    perm = np.random.permutation(N) #Obtenez une liste aléatoire de colonnes entières
    total_loss = 0

    for i in range(0, N, batch_size):
        enc_words = data_converter.train_queries[perm[i:i+batch_size]]
        dec_words = data_converter.train_responses[perm[i:i+batch_size]]
        model.reset()
        loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
        loss.backward()
        loss.unchain_backward()
        total_loss += loss.data
        opt.update()

    if (epoch+1)%10 == 0:
        ed = datetime.datetime.now()
        print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
        st = datetime.datetime.now()

def predict(model, query):
    enc_query = data_converter.sentence2ids(query, train=False)
    dec_response = model(enc_words=enc_query, train=False)
    response = data_converter.ids2words(dec_response)
    print(query, "=>", response)

#Sélectionnez dans une liste de données pour confirmer l'apprentissage
predict(model, str(data[0][0]))

Les données

Séparez les mots "phrase supérieure" et "phrase du milieu, phrase inférieure" dans les onglets Les données ci-dessous sont Recherche Haiku de Association Haijin / Musée de la littérature Haiku. ) De

test_data.txt


Assurez-vous de l'appeler un poulet
Le premier jour, je me demande si c'est une cacahuète
Saumon de fin d'année renversant du sel
Poème du Nouvel An de Manyoshu
Ni long ni court
Afin de faire face à la nouvelle lumière
Je me demande si c'est le premier jour que je n'ai pas encore fait
Droite Gauche Grand miroir première pratique
Enlève tes épaules et accepte le radis
Trois jours sont abattus dans les yeux de Kokaku
Shinonome no Miya Ruins Masaden Première vue

Résultat d'exécution

-----------------------------
[[['Avec du poulet'], ['Assurez-vous d'aller à Isuzugawa']], [['Premier jour'], ['Vert plume et paon']], [['Saumon de fin d'année'], ['Ah, répandre du sel']], [['Manyoshu'], ['Poème du nouvel an']], [['Pas longtemps'], ['Pas une question courte']], [['Première lumière'], ['Pour faire face à de nouveaux cœurs']], [['Encore'], ['Je me demande si c'est le premier jour où je n'ai rien à faire']], [['Droite gauche'], ['Première pratique du grand miroir']], [['Enlève ton épaule'], ['Acceptez le radis']], [['Trois jours'], ['Tirez dans les yeux du rôle petit angle']], [['Shinomeno'], ['La première vue du sanctuaire principal']]]
-----------------------------
epoch:	10	total loss:	53.870222091674805	time:	0:00:12.398713
epoch:	20	total loss:	30.608291625976562	time:	0:00:12.503922
epoch:	30	total loss:	13.965360641479492	time:	0:00:12.470424
epoch:	40	total loss:	6.161568880081177	time:	0:00:12.560850
epoch:	50	total loss:	2.741897940635681	time:	0:00:12.466510
['Avec du poulet'] => ['Ihe', 'Si', 'vous devez', 'Rivière Isuzu', '<eos>']

Recommended Posts

Attention Seq2 Exécutez le modèle de dialogue avec Seq
Calibrer le modèle avec PyCaret
Exécutez l'application avec Flask + Heroku
Validez le modèle d'entraînement avec Pylearn2
Seq2Seq (2) ~ Attention Model edition ~ avec chainer
Exécutez IDCF Cloud CLI sur Docker
Exécutez le YOLO original avec Jetson Nano
Un modèle qui identifie la guitare avec fast.ai
Présentation du modèle DCGAN pour Cifar 10 avec keras
Résolution du modèle Lorenz 96 avec Julia et Python
Chargez le fichier de modèle TensorFlow .pb avec readNetFromTensorflow ().
Exécutez avec CentOS7 + Apache2.4 + Python3.6 pour le moment
Surveiller le modèle d'entraînement avec TensorBord sur Jupyter Notebook
Exécutez Python avec VBA
Exécutez prepDE.py avec python3
Montage du modèle avec lmfit
Exécutez Blender avec python
Régression avec un modèle linéaire
Exécutez iperf avec python
Implémenter le modèle mathématique «modèle SIR» des maladies infectieuses avec Open Modelica
Exécutez l'intelligence de votre propre bibliothèque python avec VScode.
Profitez du modèle Gray-Scott avec un code court utilisant le calcul matriciel
Analysez le modèle thématique pour devenir romancier avec GensimPy3