Lorsque j'étudiais les méthodes de détection d'anomalies dans mon travail, j'ai trouvé une méthode appelée EfficientGAN, mais le code source de l'auteur était difficile à exécuter car la version de la bibliothèque n'était pas décrite, donc je l'ai implémentée avec des keras pour l'étude également. .. De plus, seul le réseau pour les données de table a été mis en œuvre, et la «perte d'appariement de caractéristiques» dans le calcul de la perte au moment de l'inférence n'est pas mise en œuvre.
Code source: https://github.com/asm94/EfficientGAN
↓ référencé Papier original: https://arxiv.org/abs/1802.06222 Code source de l'auteur: https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection Commentaire de l'article: https://qiita.com/masataka46/items/49dba2790fa59c29126b
・ Windows10 64 bits ・ Python 3.8.3 ・ Numpy 1.18.5 ・ Tensorflow 2.3.1 ・ Scikit-learn 0.22.2
Cette fois, nous avons défini le réseau et les fonctions d'apprentissage et d'inférence d'EfficientGAN comme une seule classe. L'image entière est la suivante. Les fonctions individuelles seront décrites plus loin.
class EfficientGAN(object):
def __init__(self, input_dim=0, latent_dim=32):
self.input_dim = int(input_dim)
self.latent_dim = int(latent_dim)
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#Voir ci-dessous
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#Voir ci-dessous
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
#Voir ci-dessous
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
#Voir ci-dessous
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#Voir ci-dessous
Je l'ai implémenté comme suit en référence au papier.
##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.input_dim,), name='input')
net = inputs
net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Encoder')
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
inputs = Input(shape=(self.latent_dim,), name='input')
net = inputs
net = Dense(64, activation='relu', kernel_initializer=initializer,
name='layer_1')(net)
net = Dense(128, activation='relu', kernel_initializer=initializer,
name='layer_2')(net)
outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
name='output')(net)
return Model(inputs=inputs, outputs=outputs, name='Generator')
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
#D(x)
inputs1 = Input(shape=(self.input_dim,), name='real')
net = inputs1
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_1')(net)
dx = Dropout(.2)(net)
#D(z)
inputs2 = Input(shape=(self.latent_dim,), name='noise')
net = inputs2
net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_2')(net)
dz = Dropout(.2)(net)
#D(x)Et D(z)Combiner
conet = Concatenate(axis=1)([dx,dz])
#D(x,z)
conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
name='layer_3')(conet)
conet = Dropout(.2)(conet)
outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
name='output')(conet)
return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')
Je l'ai implémenté comme suit en référence au papier. -Dans le code source de l'auteur, la conversion par la fonction sigmoïde est effectuée immédiatement avant le calcul de la perte, mais comme décrit dans la section 2, la conversion par la fonction sigmoïde est incorporée dans le réseau, elle n'est donc pas convertie ici. -Chaque modèle partiel tel que Discriminator est défini au moment de l'apprentissage, pas au moment de la définition de la classe EfficientGAN, et si le nombre de dimensions d'entrée n'est pas défini, le nombre de dimensions des données d'apprentissage est défini sur le nombre de dimensions d'entrée à ce moment.
#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
verbose=1):
#Convertir les données d'entraînement en type numpy
X_train = np.array(X_train)
#"input_dim"Si n'est pas 1 ou plus (en supposant que non défini), définissez le nombre de dimensions des données d'entraînement
if not self.input_dim >= 1: self.input_dim = X_train.shape[1]
#Définition du modèle de discriminateur
self.dis = self.get_discriminator()
self.dis.compile(loss=loss, optimizer=optimizer)
#Définition du modèle pour l'apprentissage de l'encodeur (Encodeur → Discriminateur)
self.enc = self.get_encoder()
x = Input(shape=(self.input_dim,))
z_gen = self.enc(x)
valid = self.dis([x, z_gen])
enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
enc_dis.compile(loss=loss, optimizer=optimizer)
#Définition de modèle pour l'apprentissage du générateur (Générateur → Discriminateur)
self.gen = self.get_generator()
z = Input(shape=(self.latent_dim,))
x_gen = self.gen(z)
valid = self.dis([x_gen, z])
gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
gen_dis.compile(loss=loss, optimizer=optimizer)
#Training
min_val_loss = float('inf')
stop_count = 0
for i in range(epochs):
#Discriminateur avec fonction d'apprentissage activée
self.dis.trainable = True
#À partir des données d'entraînement"batch_size"Obtenez au hasard la moitié de
idx = np.random.randint(0, X_train.shape[0], batch_size//2)
real_data = X_train[idx]
#"batch_size"Seule la moitié du bruit est générée et des données sont générées à partir de chaque bruit généré.
noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
gen_data = self.gen.predict(noise)
#Générer du bruit à partir de chaque donnée d'entraînement acquise
enc_noise = self.enc.predict(real_data)
#Apprentissage des discriminateurs
d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
d_loss = d_enc_loss + d_gen_loss
#Désactivez la fonction d'apprentissage de Discriminator
self.dis.trainable = False
#Apprentissage de l'encodeur
e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))
#Apprentissage du générateur
g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))
#S'il y a réglage des données d'évaluation, calcul de la perte des données et prise en compte d'un arrêt anticipé
if len(test)>0:
#Acquisition des données d'évaluation
X_test = test[0]
y_true = test[1]
#Inférence des données d'évaluation
proba = self.predict(X_test)
proba = minmax_scale(proba)
#calcul des pertes
val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()
#Si la perte des données d'évaluation est plus atténuée qu'auparavant, mettez à jour la perte minimale et réinitialisez le décompte des arrêts anticipés
if min_val_loss > val_loss:
min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
stop_count = 0 #Change "stop_count" to 0
#If "stop_count" is equal or more than "early_stop_num", training is end
#Si la perte de données d'évaluation ne diminue pas dans le nombre de fois spécifié, l'apprentissage s'arrête
elif stop_count >= early_stop_num:
break
else:
stop_count += 1
#Affichage de l'état d'apprentissage
if verbose==1 and i%100==0:
if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss}')
else: print(f'epoch{i}-> d_loss:{d_loss} e_loss:{e_loss} g_loss:{g_loss} val_loss:{val_loss}')
Je l'ai implémenté comme suit en référence au papier. Comme le montre l'article, le score anormal est calculé par la formule suivante (plus la valeur est élevée, plus elle est anormale).
A(x)=αL_G(x)+(1-α)L_D(x)・ ・ ・ Score d'anomalie
L_G(x)=||x-G(E(x))||_1 ・ ・ ・ Perte de générateur
L_D(x)=σ(D(x,E(x)),1)・ ・ ・ Perte du discriminateur
Au fait, dans le code source de l'auteur, DiscriminatorLoss est le suivant, et je me demandais lequel prendre avec le contenu de l'article, mais cette fois je l'ai implémenté avec la formule ci-dessus comme dans l'article.
L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):
#Convertir les données d'évaluation en type numpy
X_test = np.array(X_test)
#Génération de bruit à partir des données d'évaluation
z_gen = self.enc.predict(X_test)
#Générez à nouveau des données avec le bruit généré à partir des données d'évaluation
reconstructs = self.gen.predict(z_gen)
#Calculez la différence entre les données d'origine et les données régénérées pour chaque variable explicative et additionnez-les
#Si les données sont similaires aux données d'entraînement, vous devriez pouvoir régénérer les données d'entrée avec l'encodeur et le générateur que vous avez bien appris.
delta = X_test - reconstructs
gen_score = tf.norm(delta, ord=degree, axis=1).numpy()
#Infer Encoder entrée / sortie avec Discriminator
l_encoder = self.dis.predict([X_test, z_gen])
#Calculez l'entropie croisée entre le résultat d'inférence ci-dessus et les 1 séquences
#Si les données sont similaires aux données d'apprentissage, le résultat de la déduction de l'entrée / sortie de l'encodeur avec le discriminateur doit être 1.
dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()
#Return anomality calculated "gen_score" and "dis_score"
return weight*gen_score + (1-weight)*dis_score
Merci de visiter notre site. Si vous avez des inquiétudes, je vous serais reconnaissant de bien vouloir le signaler.
Recommended Posts