[PYTHON] Implemented Efficient GAN with keras

Overview

When I was investigating anomaly detection methods in my work, I found a method called EfficientGAN, but the author's source code did not describe the version of the library and it was difficult to execute, so I implemented it with keras for studying as well. .. In addition, only the network for table data was implemented, and "feature-matching loss" in the loss calculation at the time of inference is not implemented.

Source code: https://github.com/asm94/EfficientGAN

↓ Referenced Original paper: https://arxiv.org/abs/1802.06222 Author source code: https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection Commentary article: https://qiita.com/masataka46/items/49dba2790fa59c29126b

Execution environment

・ Windows10 64bit -Python 3.8.3 ・ Numpy 1.18.5 ・ Tensorflow 2.3.1 ・ Scikit-learn 0.22.2

Implementation

1. Overall configuration (class)

This time, we have defined the network and learning and inference functions of EfficientGAN as one class. The whole picture is as follows. The individual functions will be described later.

class EfficientGAN(object):
    def __init__(self, input_dim=0, latent_dim=32):
        self.input_dim = int(input_dim)
        self.latent_dim = int(latent_dim)
      
    #Train model
    def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
            optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
            verbose=1):        
        #See below
        
    #Test model
    def predict(self, X_test, weight=0.9, degree=1):        
        #See below
        
    ##Encoder
    def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
        #See below
    
    ##Generator
    def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
        #See below
    
    ##Discriminator
    def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
        #See below

2. Network

I implemented it as follows with reference to the paper. -"Input_dim" is 121, which is the number of dimensions of the data used in the paper, but it has been changed so that it can be set variably. -The activation function of the output layer of Discriminator is linear in the paper, but looking at the author's source code, it is converted by the sigmoid function at the time of loss calculation, so this time it was incorporated into the network.

##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
    inputs = Input(shape=(self.input_dim,), name='input')
    net = inputs
    net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_1')(net)
    outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
                    name='output')(net)
    
    return Model(inputs=inputs, outputs=outputs, name='Encoder')
    
##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
    inputs = Input(shape=(self.latent_dim,), name='input')
    net = inputs
    net = Dense(64, activation='relu', kernel_initializer=initializer,
                name='layer_1')(net)
    net = Dense(128, activation='relu', kernel_initializer=initializer,
                name='layer_2')(net)
    outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
                    name='output')(net)
    
    return Model(inputs=inputs, outputs=outputs, name='Generator')
    
##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
    #D(x)
    inputs1 = Input(shape=(self.input_dim,), name='real')
    net = inputs1
    net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_1')(net)
    dx = Dropout(.2)(net)
    
    #D(z)
    inputs2 = Input(shape=(self.latent_dim,), name='noise')
    net = inputs2
    net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_2')(net)
    dz = Dropout(.2)(net)
    
    #D(x)And D(z)Combine
    conet = Concatenate(axis=1)([dx,dz])
    
    #D(x,z)
    conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                  name='layer_3')(conet)
    conet = Dropout(.2)(conet)
    outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
                    name='output')(conet)

    return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')

3. Model learning

I implemented it as follows with reference to the paper. -In the author's source code, the conversion is performed by the sigmoid function immediately before the loss calculation, but as described in Section 2, the conversion by the sigmoid function is incorporated in the network, so it is not converted here. -Each partial model such as Discriminator is defined at the time of learning, not at the time of defining the EfficientGAN class, and if the number of dimensions of the input is undefined, the number of dimensions of the training data is set to the number of dimensions of the input at this timing.

#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
        optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
        verbose=1):
        
    #Convert training data to numpy type
    X_train = np.array(X_train)
        
    #"input_dim"If is not 1 or more (assuming undefined), set the number of dimensions of the training data
    if not self.input_dim >= 1: self.input_dim = X_train.shape[1]
        
    #Discriminator model definition
    self.dis = self.get_discriminator()
    self.dis.compile(loss=loss, optimizer=optimizer)        
        
    #Model definition for Encoder learning (Encoder → Discriminator)
    self.enc = self.get_encoder()
    x = Input(shape=(self.input_dim,))
    z_gen = self.enc(x)
    valid = self.dis([x, z_gen])
    enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
    enc_dis.compile(loss=loss, optimizer=optimizer) 
        
    #Model definition for Generator learning (Generator → Discriminator)
    self.gen = self.get_generator()
    z = Input(shape=(self.latent_dim,))
    x_gen = self.gen(z)
    valid = self.dis([x_gen, z])
    gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
    gen_dis.compile(loss=loss, optimizer=optimizer)          
        
    #Training
    min_val_loss = float('inf')
    stop_count = 0
    for i in range(epochs):    
        #Discriminator with learning function turned on
        self.dis.trainable = True
                
        #From the training data"batch_size"Randomly get half of
        idx = np.random.randint(0, X_train.shape[0], batch_size//2)
        real_data = X_train[idx]
    
        #"batch_size"Noise is generated by half of the generated noise, and data is generated from each generated noise.
        noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
        gen_data = self.gen.predict(noise)
    
        #Generate noise from each acquired training data
        enc_noise = self.enc.predict(real_data)
               
        #Discriminator learning
        d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
        d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
        d_loss = d_enc_loss + d_gen_loss
    
        #Turn off the learning function of Discriminator
        self.dis.trainable = False
    
        #Encoder learning
        e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))
        
        #Generator learning
        g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))
                 
        #If there is evaluation data setting, loss calculation of the data and early stop examination
        if len(test)>0:
            #Acquisition of evaluation data
            X_test = test[0]
            y_true = test[1]
                
            #Inference of evaluation data
            proba = self.predict(X_test)
            proba = minmax_scale(proba)
                
            #loss calculation
            val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()
                
            #If the loss of the evaluation data is more attenuated than before, update the minimum loss and reset the early stop count.
            if min_val_loss > val_loss:                                        
                min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
                stop_count = 0 #Change "stop_count" to 0
            #If "stop_count" is equal or more than "early_stop_num", training is end
            #If the loss of evaluation data does not decrease within the specified number of times, learning stop
            elif stop_count >= early_stop_num:
                break
            else:
                stop_count += 1               
                    
        #Display of learning status
        if verbose==1 and i%100==0:
            if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss}  e_loss:{e_loss}  g_loss:{g_loss}')
            else: print(f'epoch{i}-> d_loss:{d_loss}  e_loss:{e_loss}  g_loss:{g_loss}  val_loss:{val_loss}')

4. Model inference (anomaly detection)

I implemented it as follows with reference to the paper. As shown in the paper, the abnormal score is calculated by the following formula (the higher the value, the more abnormal).

A(x)=αL_G(x)+(1-α)L_D(x)・ ・ ・ Anomaly Score
L_G(x)=||x-G(E(x))||_1 ・ ・ ・ Generator Loss
L_D(x)=σ(D(x,E(x)),1)・ ・ ・ Discriminator Loss

By the way, in the author's source code, DiscriminatorLoss is as follows, and I was wondering which one to take with the content of the paper, but this time I implemented it with the above formula as in the paper.

L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):
        
    #Convert evaluation data to numpy type
    X_test = np.array(X_test)
        
    #Noise generation from evaluation data
    z_gen = self.enc.predict(X_test)
        
    #Data is generated again with the noise generated from the evaluation data.
    reconstructs = self.gen.predict(z_gen)
                
    #Calculate the difference between the original data and the regenerated data for each explanatory variable and add them together
    #If the data is similar to the training data, you should be able to regenerate the input data with the encoder and generator that you learned well.
    delta = X_test - reconstructs
    gen_score = tf.norm(delta, ord=degree, axis=1).numpy()
        
    #Infer Encoder input / output with Discriminator
    l_encoder = self.dis.predict([X_test, z_gen])
        
    #Calculate the cross entropy between the above inference result and an array of all 1s
    #If the data is similar to the training data, the result of inferring the input / output of the Encoder with the Discriminator should be 1.
    dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()
    
    #Return anomality calculated "gen_score" and "dis_score"
    return weight*gen_score + (1-weight)*dis_score

Thank you for visiting our website. If you have any concerns, I would appreciate it if you could point out.

Recommended Posts

Implemented Efficient GAN with keras
Implemented word2vec with Theano + Keras
Implemented Conditional GAN with chainer
I tried to move GAN (mnist) with keras
Image recognition with keras
Implemented hard-swish in Keras
CIFAR-10 tutorial with Keras
Multivariate LSTM with Keras
Learn Wasserstein GAN with Keras model and TensorFlow optimization
Multiple regression analysis with Keras
Auto Encodder notes with Keras
Implemented SMO with Python + NumPy
Sentence generation with GRU (keras)
Tuning Keras parameters with Keras Tuner
Easily build CNN with Keras
Implemented SmoothGrad with Chainer v2
Zura with softmax function implemented
Image recognition with Keras + OpenCV
MNIST (DCNN) with Keras (TensorFlow backend)
Implemented file download with Python + Bottle
I implemented Attention Seq2Seq with PyTorch
Implement Keras LSTM feedforward with numpy
Compare DCGAN and pix2pix with keras
Efficient net pick-up learned with Python
Score-CAM implementation with keras. Comparison with Grad-CAM
Prediction of sine wave with keras
Beginner RNN (LSTM) | Try with Keras
Generate fake table data with GAN
Write Reversi AI with Keras + DQN
Implemented inter-frame difference method with OpenCV
4/22 prediction of sine wave with keras
I made a GAN with Keras, so I made a video of the learning process.