[Diese Site](http://www.ie110704.net/2017/08/21/attention-seq2seq%E3%81%A7%E5] zum persönlichen Selbststudium beim Erstellen eines Chatbots in einem Seminar % AF% BE% E8% A9% B1% E3% 83% A2% E3% 83% 87% E3% 83% AB% E3% 82% 92% E5% AE% 9F% E8% A3% 85% E3% 81 % 97% E3% 81% A6% E3% 81% BF% E3% 81% 9F /).
taiwa_model_file.py
import datetime
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
import MeCab
import numpy as np
import re
#Daten aus einer Datei abrufen
with open("test_data.txt",encoding="utf-8") as f:
s = f.read()
f.close()
#Daten auflisten
l = [x.strip() for x in re.split('\t|\n',s)]
l.pop(-1)
data = np.array(l).reshape(-1, 2,1).tolist()
print("-----------------------------")
print(data)
print("-----------------------------")
gpu = -1
if gpu >= 0: #numpy oder cuda.Cupy
xp = chainer.cuda.cupy
chainer.cuda.get_device(gpu).use()
else:
xp = np
#Definition der Datentransformationsklasse
class DataConverter:
def __init__(self, batch_col_size):
"""Klasseninitialisierung
Args:
batch_col_size:Mini-Batch-Wortnummerngröße während des Lernens
"""
self.mecab = MeCab.Tagger() #Morphologischer Analysator
self.vocab = {"<eos>":0, "<unknown>": 1} #Wortwörterbuch
self.batch_col_size = batch_col_size
def load(self, data):
"""Lesen Sie zum Zeitpunkt des Trainings die Lehrerdaten und konvertieren Sie sie in ein Numpy-Array, das der Mini-Batch-Größe entspricht.
Args:
data:Dialogdaten
"""
#Word Wörterbuch Registrierung
self.vocab = {"<eos>":0, "<unknown>": 1} #Wortwörterbuch initialisieren
for d in data:
sentences = [d[0][0], d[1][0]] #Eingabetext, Antworttext
for sentence in sentences:
sentence_words = self.sentence2words(sentence) #Zerlegen Sie Sätze in Wörter
for word in sentence_words:
if word not in self.vocab:
self.vocab[word] = len(self.vocab)
#Identifizierung und Organisation von Lehrerdaten
queries, responses = [], []
for d in data:
query, response = d[0][0], d[1][0] #Anweisung codieren, Anweisung decodieren
queries.append(self.sentence2ids(sentence=query, train=True, sentence_type="query"))
responses.append(self.sentence2ids(sentence=response, train=True, sentence_type="response"))
self.train_queries = xp.vstack(queries)
self.train_responses = xp.vstack(responses)
def sentence2words(self, sentence):
"""Geben Sie den Satz als Array von Wörtern zurück
Args:
sentence:Satzzeichenfolge
"""
sentence_words = []
for m in self.mecab.parse(sentence).split("\n"): #Durch morphologische Analyse in Worte zerlegen
w = m.split("\t")[0].lower() #Wort
if len(w) == 0 or w == "eos": #Unzulässige Zeichen, EOS weggelassen
continue
sentence_words.append(w)
sentence_words.append("<eos>") #Endlich in Gesang registriert<eos>Ersatz
return sentence_words
def sentence2ids(self, sentence, train=True, sentence_type="query"):
"""Konvertieren Sie Sätze in ein Numpy-Array von Wort-IDs und kehren Sie zurück
Args:
sentence:Satzzeichenfolge
train:Ob zum Lernen
sentence_type:Ändern der Größenkompensationsrichtung für das Lernen und die Mini-Batch-Unterstützung mit Abfrageantwort"query"or"response"Angeben
Returns:
ids:Numpy Array von Wort-IDs
"""
ids = [] #Ein Array, das in eine Wort-ID konvertiert und gespeichert wird
sentence_words = self.sentence2words(sentence) #Zerlegen Sie Sätze in Wörter
for word in sentence_words:
if word in self.vocab: #Wenn das Wort im Wortwörterbuch vorhanden ist, konvertieren Sie es in ID
ids.append(self.vocab[word])
else: #Wenn das Wort im Wortwörterbuch nicht vorhanden ist<unknown>Konvertieren zu
ids.append(self.vocab["<unknown>"])
#Passen Sie beim Lernen die Wortnummerngröße an und konvertieren Sie sie zur Unterstützung von Mini-Batches in Numpy
if train:
if sentence_type == "query": #Im Falle einer Abfrage, bis die Größe der Mini-Batch-Wortnummer vorwärts erreicht ist-1 kompensieren
while len(ids) > self.batch_col_size: #Wenn es größer als die Mini-Batch-Wortgröße ist, schneiden Sie es von Anfang an ab, bis es die Mini-Batch-Wortgröße erreicht.
ids.pop(0)
ids = xp.array([-1]*(self.batch_col_size-len(ids))+ids, dtype="int32")
elif sentence_type == "response": #Im Falle einer Antwort, bis die Größe der Anzahl der Mini-Batch-Wörter rückwärts erreicht ist-1 kompensieren
while len(ids) > self.batch_col_size: #Wenn es größer als die Mini-Batch-Wortgröße ist, schneiden Sie es vom Ende aus, bis es die Mini-Batch-Wortgröße erreicht.
ids.pop()
ids = xp.array(ids+[-1]*(self.batch_col_size-len(ids)), dtype="int32")
else: #Konvertieren Sie zum Zeitpunkt der Vorhersage in Numpy, wie es ist
ids = xp.array([ids], dtype="int32")
return ids
def ids2words(self, ids):
"""Konvertieren Sie zum Zeitpunkt der Vorhersage das Numpy-Array von Wort-IDs in Wörter und kehren Sie zurück
Args:
ids:Numpy Array von Wort-IDs
Returns:
words:Anordnung der Wörter
"""
words = [] #Array zum Speichern von Wörtern
for i in ids: #Beziehen Sie sich in der angegebenen Reihenfolge auf die Wort-ID aus dem Wortwörterbuch und konvertieren Sie sie in ein Wort
words.append(list(self.vocab.keys())[list(self.vocab.values()).index(i)])
return words
#Modellklassendefinition
#LSTM-Encoderklasse
class LSTMEncoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Encoder-Instanziierung
Args:
vocab_size:Anzahl der verwendeten Worttypen
embed_size:Größe der Wörter in der Vektordarstellung
hidden_size:Versteckte Ebenengröße
"""
super(LSTMEncoder, self).__init__(
xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
eh = L.Linear(embed_size, 4 * hidden_size),
hh = L.Linear(hidden_size, 4 * hidden_size)
)
def __call__(self, x, c, h):
"""Geberberechnung
Args:
x: one-heißes Wort
c:Interner Speicher
h:Versteckte Ebene
Returns:
Nächster interner Speicher, nächste versteckte Schicht
"""
e = F.tanh(self.xe(x))
return F.lstm(c, self.eh(e) + self.hh(h))
# Attention Model +LSTM-Decoderklasse
class AttLSTMDecoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Decoder-Instanziierung für das Aufmerksamkeitsmodell
Args:
vocab_size:Anzahl der Vokabeln
embed_size:Wortvektorgröße
hidden_size:Versteckte Ebenengröße
"""
super(AttLSTMDecoder, self).__init__(
ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1), #Ebene zum Konvertieren von Wörtern in Wortvektoren
eh = L.Linear(embed_size, 4 * hidden_size), #Eine Ebene, die einen Wortvektor in einen Vektor umwandelt, der viermal so groß ist wie die verborgene Ebene
hh = L.Linear(hidden_size, 4 * hidden_size), #Eine Ebene, die den Zwischenvektor des Decoders in einen Vektor umwandelt, der viermal so groß ist wie die verborgene Ebene
fh = L.Linear(hidden_size, 4 * hidden_size), #Eine Ebene, die den gewichteten Durchschnitt des Zwischenvektors des Vorwärtscodierers in einen Vektor umwandelt, der viermal so groß ist wie die verborgene Ebene
bh = L.Linear(hidden_size, 4 * hidden_size), #Eine Ebene, die den gewichteten Durchschnitt des Zwischenvektors des Vorwärtscodierers in einen Vektor umwandelt, der viermal so groß ist wie die verborgene Ebene
he = L.Linear(hidden_size, embed_size), #Eine Ebene, die einen versteckten Ebenengrößenvektor in die Größe eines Wortvektors konvertiert
ey = L.Linear(embed_size, vocab_size) #Ebene zum Konvertieren des Wortvektors in einen Vokabulargrößenvektor
)
def __call__(self, y, c, h, f, b):
"""Decoderberechnung
Args:
y:Wörter, die in Decoder eingegeben werden müssen
c:Interner Speicher
h:Decoder-Zwischenvektor
f:Gewichteter Durchschnitt des Forward-Encoders, berechnet nach dem Aufmerksamkeitsmodell
b:Gewichteter Durchschnitt des Rückwärtscodierers, berechnet durch das Aufmerksamkeitsmodell
Returns:
Wörterbuch der Vokabulargröße, aktualisierter interner Speicher, aktualisierter Zwischenvektor
"""
e = F.tanh(self.ye(y)) #Konvertieren Sie Wörter in Wortvektoren
c, h = F.lstm(c, self.eh(e) + self.hh(h) + self.fh(f) + self.bh(b)) #LSTM mit Wortvektor, Zwischenvektor des Decoders, Aufmerksamkeit des Vorwärtscodierers, Aufmerksamkeit des Rückwärtscodierers
t = self.ey(F.tanh(self.he(h))) #Konvertieren Sie die von LSTM ausgegebene Zwischenvektorversion in einen lexikalischen Größenvektor
return t, c, h
#Achtung Modellklasse
class Attention(chainer.Chain):
def __init__(self, hidden_size):
"""Aufmerksamkeitsinstanziierung
Args:
hidden_size:Versteckte Ebenengröße
"""
super(Attention, self).__init__(
fh = L.Linear(hidden_size, hidden_size), #Eine lineare Kopplungsschicht, die den Vorwärtscodierer-Zwischenvektor in einen verborgenen Schichtgrößenvektor umwandelt
bh = L.Linear(hidden_size, hidden_size), #Eine lineare Kopplungsschicht, die den Zwischenvektor des Rückwärtscodierers in einen verborgenen Schichtgrößenvektor umwandelt
hh = L.Linear(hidden_size, hidden_size), #Eine lineare Kopplungsschicht, die den Zwischenvektor des Decoders in einen versteckten Schichtgrößenvektor umwandelt
hw = L.Linear(hidden_size, 1), #Lineare Kopplungsschicht zum Konvertieren versteckter Schichtgrößenvektoren in Skalare
)
self.hidden_size = hidden_size #Merken Sie sich die Größe der ausgeblendeten Ebene
def __call__(self, fs, bs, h):
"""Aufmerksamkeitsberechnung
Args:
fs:Eine Liste von Forward-Encoder-Zwischenvektoren
bs:Liste der Zwischenvektoren des Umkehrcodierers
h:Zwischenvektorausgabe durch Decoder
Returns:
Gewichteter Durchschnitt des Zwischenvektors des Vorwärtscodierers, gewichteter Durchschnitt des Zwischenvektors des Rückwärtscodierers
"""
batch_size = h.data.shape[0] #Denken Sie an die Größe der Mini-Charge
ws = [] #Initialisieren der Liste zum Aufzeichnen von Gewichten
sum_w = chainer.Variable(xp.zeros((batch_size, 1), dtype='float32')) #Initialisieren Sie den Wert, um den Gesamtwert des Gewichts zu berechnen
#Gewichtsberechnung unter Verwendung des Zwischenvektors des Encoders und des Zwischenvektors des Decoders
for f, b in zip(fs, bs):
w = F.tanh(self.fh(f)+self.bh(b)+self.hh(h)) #Gewichtsberechnung mit Vorwärtscodierer-Zwischenvektor, Rückwärtscodierer-Zwischenvektor, Decoder-Zwischenvektor
w = F.exp(self.hw(w)) #Normalisieren Sie mit der Softmax-Funktion
ws.append(w) #Notieren Sie das berechnete Gewicht
sum_w += w
#Initialisierung des ausgegebenen gewichteten Durchschnittsvektors
att_f = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
att_b = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
for f, b, w in zip(fs, bs, ws):
w /= sum_w #Normalisiert, so dass die Summe der Gewichte 1 ist.
#Gewicht*Fügen Sie den Zwischenvektor von Encoder zum Ausgabevektor hinzu
att_f += F.reshape(F.batch_matmul(f, w), (batch_size, self.hidden_size))
att_b += F.reshape(F.batch_matmul(b, w), (batch_size, self.hidden_size))
return att_f, att_b
#Achtung Sequenz zu Sequenzmodellklasse
class AttSeq2Seq(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size, batch_col_size):
"""Attention +Seq2 Seq Instanziierung
Args:
vocab_size:Anzahl der Vokabeln
embed_size:Wortvektorgröße
hidden_size:Versteckte Ebenengröße
"""
super(AttSeq2Seq, self).__init__(
f_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Vorwärtscodierer
b_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Rückwärtsgeber
attention = Attention(hidden_size), # Attention Model
decoder = AttLSTMDecoder(vocab_size, embed_size, hidden_size) # Decoder
)
self.vocab_size = vocab_size
self.embed_size = embed_size
self.hidden_size = hidden_size
self.decode_max_size = batch_col_size #Die Dekodierung endet, wenn EOS ausgegeben wird, die maximale Anzahl von Ausgabevokabeln, wenn sie nicht ausgegeben wird
#Initialisieren Sie die Liste, um den Vorwärtscodierer-Zwischenvektor und den Rückwärtscodierer-Zwischenvektor zu speichern
self.fs = []
self.bs = []
def encode(self, words, batch_size):
"""Geberberechnung
Args:
words:Eine aufgezeichnete Liste von Wörtern, die für Ihre Eingabe verwendet werden sollen
batch_size:Mini-Chargengröße
"""
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
#Forward Encoder Berechnung
for w in words:
c, h = self.f_encoder(w, c, h)
self.fs.append(h) #Notieren Sie den berechneten Zwischenvektor
#Interner Speicher, Initialisierung des Zwischenvektors
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
#Reverse Encoder Berechnung
for w in reversed(words):
c, h = self.b_encoder(w, c, h)
self.bs.insert(0, h) #Notieren Sie den berechneten Zwischenvektor
#Interner Speicher, Initialisierung des Zwischenvektors
self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
self.h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
def decode(self, w):
"""Decoderberechnung
Args:
w:Mit Decoder einzugebende Wörter
Returns:
Voraussagendes Wort
"""
att_f, att_b = self.attention(self.fs, self.bs, self.h)
t, self.c, self.h = self.decoder(w, self.c, self.h, att_f, att_b)
return t
def reset(self):
"""Instanzvariablen initialisieren
"""
#Initialisierung der Liste, die den Zwischenvektor von Encoder aufzeichnet
self.fs = []
self.bs = []
#Gradienteninitialisierung
self.zerograds()
def __call__(self, enc_words, dec_words=None, train=True):
"""Eine Funktion, die die Vorwärtsausbreitung berechnet
Args:
enc_words:Eine Liste der gesprochenen Wörter
dec_words:Eine Liste der Wörter im Antwortsatz
train:Lernen oder Vorhersage
Returns:
Gesamtverlust berechnete oder vorhergesagte Decodierungszeichenfolge
"""
enc_words = enc_words.T
if train:
dec_words = dec_words.T
batch_size = len(enc_words[0]) #Chargengröße aufzeichnen
self.reset() #Setzen Sie den im Modell gespeicherten Farbverlauf zurück
enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] #Ändern Sie die Wörter in der Äußerungsliste in Variablentyp
self.encode(enc_words, batch_size) #Kodierungsberechnung
t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>Zum Decoder
loss = chainer.Variable(xp.zeros((), dtype='float32')) #Verlustinitialisierung
ys = [] #Eine Liste der vom Decoder generierten Wörter
#Decoderberechnung
if train: #Berechnen Sie den Lernverlust
for w in dec_words:
y = self.decode(t) #Wort für Wort dekodieren
t = chainer.Variable(xp.array(w, dtype='int32')) #Konvertieren Sie das richtige Wort in den Variablentyp
loss += F.softmax_cross_entropy(y, t) #Berechnen Sie den Verlust, indem Sie das richtige Wort mit dem vorhergesagten Wort vergleichen
return loss
else: #Generieren Sie eine decodierte Zeichenfolge zur Vorhersage
for i in range(self.decode_max_size):
y = self.decode(t)
y = xp.argmax(y.data) #Da es immer noch mit Wahrscheinlichkeit ausgegeben wird, erhalten Sie das vorhergesagte Wort mit hoher Wahrscheinlichkeit
ys.append(y)
t = chainer.Variable(xp.array([y], dtype='int32'))
if y == 0: #Wenn EOS ausgegeben wird, ist die Decodierung beendet.
break
return ys
#Lernen
#Konstante
embed_size = 100
hidden_size = 100
batch_size = 6 #Anzahl der Stapelgrößen für das Lernen in kleinen Stapeln
batch_col_size = 15
epoch_num = 50 #Anzahl der Epochen
N = len(data) #Anzahl der Lehrerdaten
#Lehrerdaten lesen
data_converter = DataConverter(batch_col_size=batch_col_size) #Datenkonverter
data_converter.load(data) #Lesen Sie die Lehrerdaten
vocab_size = len(data_converter.vocab) #Anzahl der Wörter
#Modelldeklaration
model = AttSeq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size, batch_col_size=batch_col_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))
if gpu >= 0:
model.to_gpu(gpu)
model.reset()
#Lernen
st = datetime.datetime.now()
for epoch in range(epoch_num):
#Mini-Batch-Lernen
perm = np.random.permutation(N) #Holen Sie sich eine zufällige Liste von Ganzzahlspalten
total_loss = 0
for i in range(0, N, batch_size):
enc_words = data_converter.train_queries[perm[i:i+batch_size]]
dec_words = data_converter.train_responses[perm[i:i+batch_size]]
model.reset()
loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
loss.backward()
loss.unchain_backward()
total_loss += loss.data
opt.update()
if (epoch+1)%10 == 0:
ed = datetime.datetime.now()
print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
st = datetime.datetime.now()
def predict(model, query):
enc_query = data_converter.sentence2ids(query, train=False)
dec_response = model(enc_words=enc_query, train=False)
response = data_converter.ids2words(dec_response)
print(query, "=>", response)
#Wählen Sie aus einer Datenliste, um das Lernen zu bestätigen
predict(model, str(data[0][0]))
Trennen Sie "obere Phrase" und "mittlere Phrase, untere Phrase" in Registerkarten Die folgenden Daten sind Haiku-Suche von Haijin Association / Haiku Literature Museum ) Von
test_data.txt
Nennen Sie es unbedingt ein Huhn
Am ersten Tag frage ich mich, ob es eine Erdnuss ist
Lachs, der zum Jahresende Salz verschüttet
Manyoshu Neujahrsgedicht
Nicht lang und nicht kurz
Um mit dem neuen Licht umzugehen
Ich frage mich, ob es der erste Tag ist, den ich noch nicht getan habe
Rechts Links Großer Spiegel erste Übung
Nehmen Sie Ihre Schultern ab und akzeptieren Sie den Rettich
Drei Tage werden Kokaku in die Augen geschossen
Shinonome no Miya Ruins Masaden Erste Ansicht
-----------------------------
[[['Mit Hühnchen'], ['Gehen Sie unbedingt nach Isuzugawa']], [['Erster Tag'], ['Federgrün und Pfau']], [['Lachs zum Jahresende'], ['Ah, Salz verschütten']], [['Manyoshu'], ['Neujahrsgedicht']], [['Nicht lange'], ['Keine kurze Frage']], [['Erstes Licht'], ['Sich neuen Herzen stellen']], [['Immer noch'], ['Ich frage mich, ob es der erste Tag ist, an dem ich nichts zu tun habe']], [['Rechts links'], ['Großer Spiegel erste Übung']], [['Nimm deine Schulter ab'], ['Akzeptiere den Rettich']], [['3 Tage'], ['Schießen Sie in die Augen der Rolle kleinen Winkel']], [['Shinomeno'], ['Der erste Blick auf den Hauptschrein']]]
-----------------------------
epoch: 10 total loss: 53.870222091674805 time: 0:00:12.398713
epoch: 20 total loss: 30.608291625976562 time: 0:00:12.503922
epoch: 30 total loss: 13.965360641479492 time: 0:00:12.470424
epoch: 40 total loss: 6.161568880081177 time: 0:00:12.560850
epoch: 50 total loss: 2.741897940635681 time: 0:00:12.466510
['Mit Hühnchen'] => ['Ihe', 'Wenn', 'Sie müssen', 'Isuzu Fluss', '<eos>']