Als ich in meiner Arbeit Anomalieerkennungsmethoden untersuchte, fand ich eine Methode namens EfficientGAN, aber der Quellcode des Autors beschrieb nicht die Version der Bibliothek und es war schwierig auszuführen, so dass ich sie auch mit Keras zum Lernen implementierte. .. Außerdem wurde nur das Netzwerk für Tabellendaten implementiert, und ein "Merkmalsanpassungsverlust" in der Verlustberechnung zum Zeitpunkt der Inferenz ist nicht implementiert.
Quellcode: https://github.com/asm94/EfficientGAN
↓ Referenziert Originalarbeit: https://arxiv.org/abs/1802.06222 Quellcode des Autors: https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection Kommentarartikel: https://qiita.com/masataka46/items/49dba2790fa59c29126b
・ Windows10 64bit ・ Python 3.8.3 ・ Numpy 1.18.5 ・ Tensorflow 2.3.1 ・ Scikit-Learn 0.22.2
Dieses Mal haben wir die Netzwerk- und Lern- und Inferenzfunktionen von EfficientGAN als eine Klasse definiert. Das ganze Bild ist wie folgt. Die einzelnen Funktionen werden später beschrieben.
class EfficientGAN(object):
def __init__(self, input_dim=0, latent_dim=32):
self.input_dim = int(input_dim)
self.latent_dim = int(latent_dim)
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#Siehe unten
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#Siehe unten
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
#Siehe unten
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
#Siehe unten
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#Siehe unten
Ich habe es wie folgt unter Bezugnahme auf das Papier implementiert.
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.input_dim,), name='input')
net = inputs
net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Encoder')
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.latent_dim,), name='input')
net = inputs
net = Dense(64, activation='relu', kernel_initializer=initializer,
name='layer_1')(net)
net = Dense(128, activation='relu', kernel_initializer=initializer,
name='layer_2')(net)
outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Generator')
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#D(x)
inputs1 = Input(shape=(self.input_dim,), name='real')
net = inputs1
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
dx = Dropout(.2)(net)
#D(z)
inputs2 = Input(shape=(self.latent_dim,), name='noise')
net = inputs2
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_2')(net)
dz = Dropout(.2)(net)
#D(x)Und D.(z)Kombinieren
conet = Concatenate(axis=1)([dx,dz])
#D(x,z)
conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_3')(conet)
conet = Dropout(.2)(conet)
outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
name='output')(conet)
return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')
Ich habe es wie folgt unter Bezugnahme auf das Papier implementiert. -Im Quellcode des Autors wird die Konvertierung durch die Sigmoid-Funktion unmittelbar vor der Verlustberechnung durchgeführt. Wie in Abschnitt 2 beschrieben, wird die Konvertierung durch die Sigmoid-Funktion in das Netzwerk integriert, sodass sie hier nicht konvertiert wird. -Jedes Teilmodell wie Discriminator wird zum Zeitpunkt des Lernens definiert, nicht zum Zeitpunkt der Definition der EfficientGAN-Klasse. Wenn die Anzahl der Eingabedimensionen nicht definiert ist, wird die Anzahl der Dimensionen der Trainingsdaten zu diesem Zeitpunkt auf die Anzahl der Eingabedimensionen festgelegt.
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#Konvertieren Sie Trainingsdaten in Numpy-Typen
X_train = np.array(X_train)
#"input_dim"Wenn nicht 1 oder mehr (vorausgesetzt, undefiniert), legen Sie die Anzahl der Dimensionen der Trainingsdaten fest
if not self.input_dim >= 1: self.input_dim = X_train.shape[1]
#Definition des Diskriminatormodells
self.dis = self.get_discriminator()
self.dis.compile(loss=loss, optimizer=optimizer)
#Modelldefinition für das Lernen von Encodern (Encoder → Diskriminator)
self.enc = self.get_encoder()
x = Input(shape=(self.input_dim,))
z_gen = self.enc(x)
valid = self.dis([x, z_gen])
enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
enc_dis.compile(loss=loss, optimizer=optimizer)
#Modelldefinition für das Lernen des Generators (Generator → Diskriminator)
self.gen = self.get_generator()
z = Input(shape=(self.latent_dim,))
x_gen = self.gen(z)
valid = self.dis([x_gen, z])
gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
gen_dis.compile(loss=loss, optimizer=optimizer)
#Training
min_val_loss = float('inf')
stop_count = 0
for i in range(epochs):
#Diskriminator mit aktivierter Lernfunktion
self.dis.trainable = True
#Aus den Trainingsdaten"batch_size"Holen Sie sich zufällig die Hälfte von
idx = np.random.randint(0, X_train.shape[0], batch_size//2)
real_data = X_train[idx]
#"batch_size"Es wird nur die Hälfte des Rauschens erzeugt, und aus jedem erzeugten Rauschen werden Daten erzeugt.
noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
gen_data = self.gen.predict(noise)
#Erzeugen Sie aus den erfassten Trainingsdaten Rauschen
enc_noise = self.enc.predict(real_data)
#Diskriminator lernen
d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
d_loss = d_enc_loss + d_gen_loss
#Schalten Sie die Lernfunktion von Discriminator aus
self.dis.trainable = False
#Encoder lernen
e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))
#Generator lernen
g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))
#Wenn Bewertungsdaten eingestellt sind, Verlustberechnung der Daten und frühzeitige Stoppprüfung
if len(test)>0:
#Erfassung von Bewertungsdaten
X_test = test[0]
y_true = test[1]
#Rückschluss auf Bewertungsdaten
proba = self.predict(X_test)
proba = minmax_scale(proba)
#Verlustberechnung
val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()
#Wenn der Verlust der Auswertungsdaten stärker gedämpft wird als zuvor, aktualisieren Sie den minimalen Verlust und setzen Sie die Anzahl der frühen Stopps zurück
if min_val_loss > val_loss:
min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
stop_count = 0 #Change "stop_count" to 0
#If "stop_count" is equal or more than "early_stop_num", training is end
#Wenn der Verlust von Bewertungsdaten nicht innerhalb der angegebenen Anzahl von Malen abnimmt, wird das Lernen gestoppt
elif stop_count >= early_stop_num:
break
else:
stop_count += 1
#Anzeige des Lernstatus
if verbose==1 and i%100==0:
if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss}')
else: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss} val_loss:{val_loss}')
Ich habe es wie folgt unter Bezugnahme auf das Papier implementiert. Wie in der Veröffentlichung gezeigt, wird die abnormale Bewertung nach der folgenden Formel berechnet (je höher der Wert, desto abnormaler).
A(x)=αL_G(x)+(1-α)L_D(x)・ ・ ・ Anomalie-Punktzahl
L_G(x)=||x-G(E(x))||_1 ・ ・ ・ Generatorverlust
L_D(x)=σ(D(x,E(x)),1)・ ・ ・ Diskriminatorverlust
Im Quellcode des Autors lautet DiscriminatorLoss übrigens wie folgt, und ich habe mich gefragt, welchen ich mit dem Inhalt des Papiers aufnehmen soll, aber diesmal habe ich ihn mit der obigen Formel wie im Papier implementiert.
L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#Konvertieren Sie die Auswertungsdaten in den Numpy-Typ
X_test = np.array(X_test)
#Geräuschentwicklung aus Bewertungsdaten
z_gen = self.enc.predict(X_test)
#Generieren Sie Daten erneut mit Rauschen, das aus Auswertungsdaten generiert wird
reconstructs = self.gen.predict(z_gen)
#Berechnen Sie die Differenz zwischen den Originaldaten und den regenerierten Daten für jede erklärende Variable und addieren Sie sie
#Wenn die Daten den Trainingsdaten ähnlich sind, sollten Sie in der Lage sein, die Eingabedaten mit dem Encoder und Generator neu zu generieren, die Sie gut gelernt haben.
delta = X_test - reconstructs
gen_score = tf.norm(delta, ord=degree, axis=1).numpy()
#Infer Encoder Ein- / Ausgang mit Discriminator
l_encoder = self.dis.predict([X_test, z_gen])
#Berechnen Sie die Kreuzentropie zwischen dem obigen Inferenzergebnis und allen 1 Sequenzen
#Wenn die Daten den Trainingsdaten ähnlich sind, sollte das Ergebnis der Schlussfolgerung der Eingabe / Ausgabe des Encoders mit dem Diskriminator 1 sein.
dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()
#Return anomality calculated "gen_score" and "dis_score"
return weight*gen_score + (1-weight)*dis_score
Vielen Dank für Ihren Besuch auf unserer Website. Wenn Sie irgendwelche Bedenken haben, würde ich es begrüßen, wenn Sie darauf hinweisen könnten.
Recommended Posts