J'ai décidé de créer une application simple avec la technologie que j'ai apprise en étudiant l'IA. J'ai fait quelque chose comme ce qui suit. La raison pour laquelle j'ai choisi Anpanman était parce que c'était un personnage de dessin animé que je pouvais facilement écrire. Le modèle voulait gérer le GAN et il était possible de détecter des anomalies en apprenant uniquement des images normales. J'ai décidé d'utiliser ANOGAN, mais quand je l'ai recherché, il s'appelait EfficientGAN dans la version haute vitesse d'ANOGAN. Il semble y avoir quelque chose, alors je l'ai choisi. Aussi, par souci de simplicité, je l'ai créé en partant du principe que seul le visage d'Anpanman est identifié.
J'ai rassemblé des images d'Anpanman sur le net en grattant, traité les images et découpé uniquement le visage. De plus, comme décrit plus loin, l'arrière-plan de l'image de la caméra mobile était gris, donc pour apprendre également l'arrière-plan Les données de gradation de gris ont été développées à l'aide de la fonction suivante.
from PIL import Image, ImageOps
import numpy as np
def make_gray_gradation(img, gradation_range=(230, 255)):
"""
Convertit l'arrière-plan de l'image d'entrée en une gradation de gris d'un degré aléatoire
Input :Fichier d'image(La couleur est également acceptable)
Output :Fichier d'image(Image avec fond de conversion de dégradé de gris)
Pramater
img :Image d'entrée
gradation_range :Plage de valeurs RVB à dégradé
"""
gra_range = np.random.randint(*gradation_range)
gray = ImageOps.grayscale(img)
output = ImageOps.colorize(gray, black=(0, 0, 0), white=(gra_range, gra_range, gra_range))
return output
L'image normale est Anpanman qui a écrit l'image d'illustration sur le papier à dessin. J'ai utilisé une image d'Anpanman écrite par moi et mes anciens élèves prise avec un téléphone portable.
L'image anormale est une image d'une illustration de Bikinman, Dokin, etc. prise avec un téléphone portable de la même manière que ci-dessus. J'ai utilisé une mauvaise image d'Anpanman grattée sur le net.
Étant donné que l'arrière-plan de l'image d'entrée prise avec le téléphone mobile était gris, il y a également une gradation de gris dans l'image d'apprentissage Je l'ai inséré, mais je n'ai pas pu le reproduire correctement. Le score dépend également du fait que l'arrière-plan est bien généré, plutôt que du fait que l'image d'Anpanman est bien dessinée. Il semble que cela ait été décidé, et le seuil de discrimination des images anormales n'a pas été bien décidé.
En guise de contre-mesure, rendez l'arrière-plan entièrement blanc pour toutes les images et assurez-vous simplement que le contour de l'image ressemble à Anpanman. J'ai permis de déterminer s'il s'agit d'une image anormale. Voici une fonction qui binarise l'image d'entrée.
import os
import scipy.stats as stats
from PIL import Image
def image_binarization(path_in, path_out, th_zero_num=1400, width=100, height=100):
"""
Le contour de l'image d'entrée est binarisé en noir et blanc et en sortie.
Input :Chemin du dossier où les fichiers image sont stockés(La fin est/) (Seules les images peuvent être placées dans le dossier)
Output :Enregistrez l'image binarisée dans le dossier spécifié. 0 après binarisation(Contour)Sortez le nombre de points.
Pramater
path_in :Chemin du répertoire contenant les images d'entrée
path_out :Chemin du répertoire de sortie
th_zero_num :0 dans l'image(Contour)Valeur MIN du nombre de points de(Si le contour est trop sombre, réduisez-le et ajustez)
width :Taille de la largeur de l'image
height :Taille verticale de l'image
"""
list_in = os.listdir(path_in)
im_np_out = np.empty((0, width*height))
for img in list_in:
path_name = path_in+img
x_img = cv2.imread(path_name)
x_img = cv2.resize(x_img, (width, height))
x_img= cv2.cvtColor(x_img, cv2.COLOR_BGR2GRAY)
x_img = np.array(x_img)
x_img = x_img / 255.0
x_img = x_img.reshape((1, width, height))
x_img = x_img.reshape(1, width*height)
m = stats.mode(x_img)
max_hindo = m.mode[0][0]
for c in reversed(range(50)):
th = (c+1)*0.01
th_0_1 = max_hindo-th
x_img_ = np.where(x_img>th_0_1, 1, 0)
if (np.count_nonzero(x_img_ == 0))>th_zero_num:
break
display(np.count_nonzero(x_img_ == 0))
x_img = x_img_.reshape(width, height)
x_img = (x_img * 2.0) - 1.0
img_np_255 = (x_img + 1.0) * 127.5
img_np_255_mod1 = np.maximum(img_np_255, 0)
img_np_255_mod1 = np.minimum(img_np_255_mod1, 255)
img_np_uint8 = img_np_255_mod1.astype(np.uint8)
image = Image.fromarray(img_np_uint8)
image.save(path_out+img, quality=95)
La distribution des scores a été divisée dans une certaine mesure entre l'image correcte et l'image anormale, donc pour le moment, à peu près Il semble que vous puissiez décider du seuil qui sépare normal et anormal. (Le seuil a été fixé à 0,40372)
Entrez l'image Anpanman que vous avez écrite à travers l'image d'illustration de seulement 5 feuilles, D'autres sont entrés dans 14 pauvres Anpanman grattés sur le net. (Entrez après la binarisation en utilisant la fonction de binarisation ci-dessus)
Le résultat du jugement est représenté par la couleur d'arrière-plan du cadre d'affichage du score. Si l'arrière-plan est bleu, c'est une image normale, et s'il est rouge, c'est une image anormale. En conséquence, 4/5 est considéré comme normal pour Anpanman qui voit à travers l'illustration. 8/14 est un jugement anormal pour un autre Anpanman mal gratté Le taux de réponse correcte était de 12/19 = 63,15%. Je dois améliorer la précision.
・ Même si j'ai pris une photo dessinée sur du papier à dessin blanc pur, l'arrière-plan de l'image réelle était gris. En ressentant l'ampleur de l'influence de la lumière et en concevant des méthodes telles que la binarisation, en limitant les conditions à celles qui sont plus faciles à apprendre. J'ai appris que la précision pouvait être améliorée. ・ GrobalAvaragePooling, LeakyReLu, bruit de couche, etc. pour améliorer la précision du GAN Je pense que c'était bien de pouvoir essayer différentes méthodes d'amélioration de la précision. (Bien que le résultat ne se soit pas beaucoup amélioré)
Je voudrais étudier et essayer diverses mesures pour améliorer la précision du GAN. De plus, j'avais l'habitude d'utiliser Google Colab et EC2 d'AWS, mais à l'avenir, diverses choses telles que SageMaker d'AWS et GCP Je souhaite étudier en utilisant le cloud.
train_BiGAN.py
import numpy as np
import os
import tensorflow as tf
import utility as Utility
import argparse
import matplotlib.pyplot as plt
from model_BiGAN import BiGAN as Model
from make_datasets_TRAIN import Make_datasets_TRAIN as Make_datasets
def parser():
parser = argparse.ArgumentParser(description='train LSGAN')
parser.add_argument('--batch_size', '-b', type=int, default=300, help='Number of images in each mini-batch')
parser.add_argument('--log_file_name', '-lf', type=str, default='anpanman', help='log file name')
parser.add_argument('--epoch', '-e', type=int, default=1001, help='epoch')
parser.add_argument('--file_train_data', '-ftd', type=str, default='../Train_Data/191103/', help='train data')
parser.add_argument('--test_true_data', '-ttd', type=str, default='../Valid_True_Data/191103/', help='test of true_data')
parser.add_argument('--test_false_data', '-tfd', type=str, default='../Valid_False_Data/191103/', help='test of false_data')
parser.add_argument('--valid_span', '-vs', type=int, default=100, help='validation span')
return parser.parse_args()
args = parser()
#global variants
BATCH_SIZE = args.batch_size
LOGFILE_NAME = args.log_file_name
EPOCH = args.epoch
FILE_NAME = args.file_train_data
TRUE_DATA = args.test_true_data
FALSE_DATA = args.test_false_data
IMG_WIDTH = 100
IMG_HEIGHT = 100
IMG_CHANNEL = 1
BASE_CHANNEL = 32
NOISE_UNIT_NUM = 200
NOISE_MEAN = 0.0
NOISE_STDDEV = 1.0
TEST_DATA_SAMPLE = 5 * 5
L2_NORM = 0.001
KEEP_PROB_RATE = 0.5
SEED = 1234
SCORE_ALPHA = 0.9 # using for cost function
VALID_SPAN = args.valid_span
np.random.seed(seed=SEED)
BOARD_DIR_NAME = './tensorboard/' + LOGFILE_NAME
OUT_IMG_DIR = './out_images_BiGAN' #output image file
out_model_dir = './out_models_BiGAN/' #output model_ckpt file
#Load_model_dir = '../model_ckpt/' #Load model_ckpt file
OUT_HIST_DIR = './out_score_hist_BiGAN' #output histogram file
CYCLE_LAMBDA = 1.0
try:
os.mkdir('log')
os.mkdir('out_graph')
os.mkdir(OUT_IMG_DIR)
os.mkdir(out_model_dir)
os.mkdir(OUT_HIST_DIR)
os.mkdir('./out_images_Debug') #for debug
except:
pass
make_datasets = Make_datasets(FILE_NAME, TRUE_DATA, FALSE_DATA, IMG_WIDTH, IMG_HEIGHT, SEED)
model = Model(NOISE_UNIT_NUM, IMG_CHANNEL, SEED, BASE_CHANNEL, KEEP_PROB_RATE)
z_ = tf.placeholder(tf.float32, [None, NOISE_UNIT_NUM], name='z_') #noise to generator
x_ = tf.placeholder(tf.float32, [None, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL], name='x_') #image to classifier
d_dis_f_ = tf.placeholder(tf.float32, [None, 1], name='d_dis_g_') #target of discriminator related to generator
d_dis_r_ = tf.placeholder(tf.float32, [None, 1], name='d_dis_r_') #target of discriminator related to real image
is_training_ = tf.placeholder(tf.bool, name = 'is_training')
with tf.variable_scope('encoder_model'):
z_enc = model.encoder(x_, reuse=False, is_training=is_training_)
with tf.variable_scope('decoder_model'):
x_dec = model.decoder(z_, reuse=False, is_training=is_training_)
x_z_x = model.decoder(z_enc, reuse=True, is_training=is_training_) # for cycle consistency
with tf.variable_scope('discriminator_model'):
#stream around discriminator
drop3_r, logits_r = model.discriminator(x_, z_enc, reuse=False, is_training=is_training_) #real pair
drop3_f, logits_f = model.discriminator(x_dec, z_, reuse=True, is_training=is_training_) #real pair
drop3_re, logits_re = model.discriminator(x_z_x, z_enc, reuse=True, is_training=is_training_) #fake pair
with tf.name_scope("loss"):
loss_dis_f = tf.reduce_mean(tf.square(logits_f - d_dis_f_), name='Loss_dis_gen') #loss related to generator
loss_dis_r = tf.reduce_mean(tf.square(logits_r - d_dis_r_), name='Loss_dis_rea') #loss related to real image
#total loss
loss_dis_total = loss_dis_f + loss_dis_r
loss_dec_total = loss_dis_f
loss_enc_total = loss_dis_r
with tf.name_scope("score"):
l_g = tf.reduce_mean(tf.abs(x_ - x_z_x), axis=(1,2,3))
l_FM = tf.reduce_mean(tf.abs(drop3_r - drop3_re), axis=1)
score_A = SCORE_ALPHA * l_g + (1.0 - SCORE_ALPHA) * l_FM
with tf.name_scope("optional_loss"):
loss_dec_opt = loss_dec_total + CYCLE_LAMBDA * l_g
loss_enc_opt = loss_enc_total + CYCLE_LAMBDA * l_g
tf.summary.scalar('loss_dis_total', loss_dis_total)
tf.summary.scalar('loss_dec_total', loss_dec_total)
tf.summary.scalar('loss_enc_total', loss_enc_total)
merged = tf.summary.merge_all()
# t_vars = tf.trainable_variables()
dec_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="decoder")
enc_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="encoder")
dis_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="discriminator")
with tf.name_scope("train"):
train_dis = tf.train.AdamOptimizer(learning_rate=0.0001, beta1=0.5).minimize(loss_dis_total, var_list=dis_vars
, name='Adam_dis')
train_dec = tf.train.AdamOptimizer(learning_rate=0.01, beta1=0.5).minimize(loss_dec_total, var_list=dec_vars
, name='Adam_dec')
train_enc = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_enc_total, var_list=enc_vars
, name='Adam_enc')
train_dec_opt = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_dec_opt, var_list=dec_vars
, name='Adam_dec')
train_enc_opt = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_enc_opt, var_list=enc_vars
, name='Adam_enc')
sess = tf.Session()
ckpt = tf.train.get_checkpoint_state(out_model_dir)
saver = tf.train.Saver()
if ckpt: #S'il y a un point de contrôle
last_model = ckpt.model_checkpoint_path #Chemin vers le dernier modèle enregistré
saver.restore(sess, last_model) #Lecture de données variables
print("load " + last_model)
else: #Lorsqu'il n'y a pas de données enregistrées
#init = tf.initialize_all_variables()
sess.run(tf.global_variables_initializer())
summary_writer = tf.summary.FileWriter(BOARD_DIR_NAME, sess.graph)
log_list = []
log_list.append(['epoch', 'AUC'])
#training loop
for epoch in range(0, EPOCH):
sum_loss_dis_f = np.float32(0)
sum_loss_dis_r = np.float32(0)
sum_loss_dis_total = np.float32(0)
sum_loss_dec_total = np.float32(0)
sum_loss_enc_total = np.float32(0)
len_data = make_datasets.make_data_for_1_epoch()
for i in range(0, len_data, BATCH_SIZE):
img_batch = make_datasets.get_data_for_1_batch(i, BATCH_SIZE)
z = make_datasets.make_random_z_with_norm(NOISE_MEAN, NOISE_STDDEV, len(img_batch), NOISE_UNIT_NUM)
tar_g_1 = make_datasets.make_target_1_0(1.0, len(img_batch)) #1 -> real
tar_g_0 = make_datasets.make_target_1_0(0.0, len(img_batch)) #0 -> fake
#train discriminator
sess.run(train_dis, feed_dict={z_:z, x_: img_batch, d_dis_f_: tar_g_0, d_dis_r_: tar_g_1, is_training_:True})
#train decoder
sess.run(train_dec, feed_dict={z_:z, d_dis_f_: tar_g_1, is_training_:True})
# sess.run(train_dec_opt, feed_dict={z_:z, x_: img_batch, d_dis_f_: tar_g_1, is_training_:True})
#train encoder
sess.run(train_enc, feed_dict={x_:img_batch, d_dis_r_: tar_g_0, is_training_:True})
# sess.run(train_enc_opt, feed_dict={x_:img_batch, d_dis_r_: tar_g_0, is_training_:True})
# loss for discriminator
loss_dis_total_, loss_dis_r_, loss_dis_f_ = sess.run([loss_dis_total, loss_dis_r, loss_dis_f],
feed_dict={z_: z, x_: img_batch, d_dis_f_: tar_g_0,
d_dis_r_: tar_g_1, is_training_:False})
#loss for decoder
loss_dec_total_ = sess.run(loss_dec_total, feed_dict={z_: z, d_dis_f_: tar_g_1, is_training_:False})
#loss for encoder
loss_enc_total_ = sess.run(loss_enc_total, feed_dict={x_: img_batch, d_dis_r_: tar_g_0, is_training_:False})
#for tensorboard
merged_ = sess.run(merged, feed_dict={z_:z, x_: img_batch, d_dis_f_: tar_g_0, d_dis_r_: tar_g_1, is_training_:False})
summary_writer.add_summary(merged_, epoch)
sum_loss_dis_f += loss_dis_f_
sum_loss_dis_r += loss_dis_r_
sum_loss_dis_total += loss_dis_total_
sum_loss_dec_total += loss_dec_total_
sum_loss_enc_total += loss_enc_total_
print("----------------------------------------------------------------------")
print("epoch = {:}, Encoder Total Loss = {:.4f}, Decoder Total Loss = {:.4f}, Discriminator Total Loss = {:.4f}".format(
epoch, sum_loss_enc_total / len_data, sum_loss_dec_total / len_data, sum_loss_dis_total / len_data))
print("Discriminator Real Loss = {:.4f}, Discriminator Generated Loss = {:.4f}".format(
sum_loss_dis_r / len_data, sum_loss_dis_r / len_data))
if epoch % VALID_SPAN == 0:
# score_A_list = []
score_A_np = np.zeros((0, 2), dtype=np.float32)
val_data_num = len(make_datasets.valid_data)
val_true_data_num = len(make_datasets.valid_true_np)
val_false_data_num = len(make_datasets.valid_false_np)
img_batch_1, _ = make_datasets.get_valid_data_for_1_batch(0, val_true_data_num)
img_batch_0, _ = make_datasets.get_valid_data_for_1_batch(val_data_num - val_false_data_num, val_true_data_num)
x_z_x_1 = sess.run(x_z_x, feed_dict={x_:img_batch_1, is_training_:False})
x_z_x_0 = sess.run(x_z_x, feed_dict={x_:img_batch_0, is_training_:False})
score_A_1 = sess.run(score_A, feed_dict={x_:img_batch_1, is_training_:False})
score_A_0 = sess.run(score_A, feed_dict={x_:img_batch_0, is_training_:False})
score_A_re_1 = np.reshape(score_A_1, (-1, 1))
score_A_re_0 = np.reshape(score_A_0, (-1, 1))
tars_batch_1 = np.ones(val_true_data_num)
tars_batch_0 = np.zeros(val_false_data_num)
tars_batch_re_1 = np.reshape(tars_batch_1, (-1, 1))
tars_batch_re_0 = np.reshape(tars_batch_0, (-1, 1))
score_A_np_1_tmp = np.concatenate((score_A_re_1, tars_batch_re_1), axis=1)
score_A_np_0_tmp = np.concatenate((score_A_re_0, tars_batch_re_0), axis=1)
score_A_np = np.concatenate((score_A_np_1_tmp, score_A_np_0_tmp), axis=0)
#print(score_A_np)
tp, fp, tn, fn, precision, recall = Utility.compute_precision_recall(score_A_np)
auc = Utility.make_ROC_graph(score_A_np, 'out_graph/' + LOGFILE_NAME, epoch)
print("tp:{}, fp:{}, tn:{}, fn:{}, precision:{:.4f}, recall:{:.4f}, AUC:{:.4f}".format(tp, fp, tn, fn, precision, recall, auc))
log_list.append([epoch, auc])
Utility.make_score_hist(score_A_1, score_A_0, epoch, LOGFILE_NAME, OUT_HIST_DIR)
Utility.make_output_img(img_batch_1, img_batch_0, x_z_x_1, x_z_x_0, score_A_0, score_A_1, epoch, LOGFILE_NAME, OUT_IMG_DIR)
#after learning
Utility.save_list_to_csv(log_list, 'log/' + LOGFILE_NAME + '_auc.csv')
#saver2 = tf.train.Saver()
save_path = saver.save(sess, out_model_dir + 'anpanman_weight.ckpt')
print("Model saved in file: ", save_path)
model_BiGAN.py
import numpy as np
# import os
import tensorflow as tf
# from PIL import Image
# import utility as Utility
# import argparse
class BiGAN():
def __init__(self, noise_unit_num, img_channel, seed, base_channel, keep_prob):
self.NOISE_UNIT_NUM = noise_unit_num # 200
self.IMG_CHANNEL = img_channel # 1
self.SEED = seed
np.random.seed(seed=self.SEED)
self.BASE_CHANNEL = base_channel # 32
self.KEEP_PROB = keep_prob
def leaky_relu(self, x, alpha):
return tf.nn.relu(x) - alpha * tf.nn.relu(-x)
def gaussian_noise(self, input, std): #used at discriminator
noise = tf.random_normal(shape=tf.shape(input), mean=0.0, stddev=std, dtype=tf.float32, seed=self.SEED)
return input + noise
def conv2d(self, input, in_channel, out_channel, k_size, stride, seed):
w = tf.get_variable('w', [k_size, k_size, in_channel, out_channel],
initializer=tf.random_normal_initializer
(mean=0.0, stddev=0.02, seed=seed), dtype=tf.float32)
b = tf.get_variable('b', [out_channel], initializer=tf.constant_initializer(0.0))
conv = tf.nn.conv2d(input, w, strides=[1, stride, stride, 1], padding="SAME", name='conv') + b
return conv
def conv2d_transpose(self, input, in_channel, out_channel, k_size, stride, seed):
w = tf.get_variable('w', [k_size, k_size, out_channel, in_channel],
initializer=tf.random_normal_initializer
(mean=0.0, stddev=0.02, seed=seed), dtype=tf.float32)
b = tf.get_variable('b', [out_channel], initializer=tf.constant_initializer(0.0))
out_shape = tf.stack(
[tf.shape(input)[0], tf.shape(input)[1] * 2, tf.shape(input)[2] * 2, tf.constant(out_channel)])
deconv = tf.nn.conv2d_transpose(input, w, output_shape=out_shape, strides=[1, stride, stride, 1],
padding="SAME") + b
return deconv
def batch_norm(self, input):
shape = input.get_shape().as_list()
n_out = shape[-1]
scale = tf.get_variable('scale', [n_out], initializer=tf.constant_initializer(1.0))
beta = tf.get_variable('beta', [n_out], initializer=tf.constant_initializer(0.0))
batch_mean, batch_var = tf.nn.moments(input, [0])
bn = tf.nn.batch_normalization(input, batch_mean, batch_var, beta, scale, 0.0001, name='batch_norm')
return bn
def fully_connect(self, input, in_num, out_num, seed):
w = tf.get_variable('w', [in_num, out_num], initializer=tf.random_normal_initializer
(mean=0.0, stddev=0.02, seed=seed), dtype=tf.float32)
b = tf.get_variable('b', [out_num], initializer=tf.constant_initializer(0.0))
fc = tf.matmul(input, w, name='fc') + b
return fc
def encoder(self, x, reuse=False, is_training=False): #x is expected [n, 28, 28, 1]
with tf.variable_scope('encoder', reuse=reuse):
with tf.variable_scope("layer1"): # layer1 conv nx28x28x1 -> nx14x14x32
conv1 = self.conv2d(x, self.IMG_CHANNEL, self.BASE_CHANNEL, 3, 2, self.SEED)
with tf.variable_scope("layer2"): # layer2 conv nx14x14x32 -> nx7x7x64
conv2 = self.conv2d(conv1, self.BASE_CHANNEL, self.BASE_CHANNEL*2, 3, 2, self.SEED)
bn2 = self.batch_norm(conv2)
lr2 = self.leaky_relu(bn2, alpha=0.1)
with tf.variable_scope("layer3"): # layer3 conv nx7x7x64 -> nx4x4x128
conv3 = self.conv2d(lr2, self.BASE_CHANNEL*2, self.BASE_CHANNEL*4, 3, 2, self.SEED)
bn3 = self.batch_norm(conv3)
lr3 = self.leaky_relu(bn3, alpha=0.1)
with tf.variable_scope("layer4"): # layer4 fc nx4x4x128 -> nx200
shape = tf.shape(lr3)
print(shape[1])
reshape4 = tf.reshape(lr3, [shape[0], shape[1]*shape[2]*shape[3]])
fc4 = self.fully_connect(reshape4, 21632, self.NOISE_UNIT_NUM, self.SEED)
return fc4
def decoder(self, z, reuse=False, is_training=False): # z is expected [n, 200]
with tf.variable_scope('decoder', reuse=reuse):
with tf.variable_scope("layer1"): # layer1 fc nx200 -> nx1024
fc1 = self.fully_connect(z, self.NOISE_UNIT_NUM, 1024, self.SEED)
bn1 = self.batch_norm(fc1)
rl1 = tf.nn.relu(bn1)
with tf.variable_scope("layer2"): # layer2 fc nx1024 -> nx6272
fc2 = self.fully_connect(rl1, 1024, 25*25*self.BASE_CHANNEL*4, self.SEED)
bn2 = self.batch_norm(fc2)
rl2 = tf.nn.relu(bn2)
with tf.variable_scope("layer3"): # layer3 deconv nx6272 -> nx7x7x128 -> nx14x14x64
shape = tf.shape(rl2)
reshape3 = tf.reshape(rl2, [shape[0], 25, 25, 128])
deconv3 = self.conv2d_transpose(reshape3, self.BASE_CHANNEL*4, self.BASE_CHANNEL*2, 4, 2, self.SEED)
bn3 = self.batch_norm(deconv3)
rl3 = tf.nn.relu(bn3)
with tf.variable_scope("layer4"): # layer3 deconv nx14x14x64 -> nx28x28x1
deconv4 = self.conv2d_transpose(rl3, self.BASE_CHANNEL*2, self.IMG_CHANNEL, 4, 2, self.SEED)
tanh4 = tf.tanh(deconv4)
return tanh4
def discriminator(self, x, z, reuse=False, is_training=True): #z[n, 200], x[n, 28, 28, 1]
with tf.variable_scope('discriminator', reuse=reuse):
with tf.variable_scope("x_layer1"): # layer x1 conv [n, 28, 28, 1] -> [n, 14, 14, 64]
convx1 = self.conv2d(x, self.IMG_CHANNEL, self.BASE_CHANNEL*2, 4, 2, self.SEED)
lrx1 = self.leaky_relu(convx1, alpha=0.1)
dropx1 = tf.layers.dropout(lrx1, rate=1.0 - self.KEEP_PROB, name='dropout', training=is_training)
with tf.variable_scope("x_layer2"): # layer x2 conv [n, 14, 14, 64] -> [n, 7, 7, 64] -> [n, 3136]
convx2 = self.conv2d(dropx1, self.BASE_CHANNEL*2, self.BASE_CHANNEL*2, 4, 2, self.SEED)
bnx2 = self.batch_norm(convx2)
lrx2 = self.leaky_relu(bnx2, alpha=0.1)
dropx2 = tf.layers.dropout(lrx2, rate=1.0 - self.KEEP_PROB, name='dropout', training=is_training)
shapex2 = tf.shape(dropx2)
reshape3 = tf.reshape(dropx2, [shapex2[0], shapex2[1]*shapex2[2]*shapex2[3]])
with tf.variable_scope("z_layer1"): # layer1 fc [n, 200] -> [n, 512]
fcz1 = self.fully_connect(z, self.NOISE_UNIT_NUM, 512, self.SEED)
lrz1 = self.leaky_relu(fcz1, alpha=0.1)
dropz1 = tf.layers.dropout(lrz1, rate=1.0 - self.KEEP_PROB, name='dropout', training=is_training)
with tf.variable_scope("y_layer3"): # layer1 fc [n, 6272], [n, 1024]
con3 = tf.concat([reshape3, dropz1], axis=1)
fc3 = self.fully_connect(con3, 40000+512, 1024, self.SEED)
lr3 = self.leaky_relu(fc3, alpha=0.1)
self.drop3 = tf.layers.dropout(lr3, rate=1.0 - self.KEEP_PROB, name='dropout', training=is_training)
with tf.variable_scope("y_fc_logits"):
self.logits = self.fully_connect(self.drop3, 1024, 1, self.SEED)
return self.drop3, self.logits
make_datasets_TRAIN.py
import numpy as np
import os
import glob
import re
import random
#import cv2
from PIL import Image
from keras.preprocessing import image
class Make_datasets_TRAIN():
def __init__(self, filename, true_data, false_data, img_width, img_height, seed):
self.filename = filename
self.true_data = true_data
self.false_data = false_data
self.img_width = img_width
self.img_height = img_height
self.seed = seed
x_train, x_valid_true, x_valid_false, y_train, y_valid_true, y_valid_false = self.read_DATASET(self.filename, self.true_data, self.false_data)
self.train_np = np.concatenate((y_train.reshape(-1,1), x_train), axis=1).astype(np.float32)
self.valid_true_np = np.concatenate((y_valid_true.reshape(-1,1), x_valid_true), axis=1).astype(np.float32)
self.valid_false_np = np.concatenate((y_valid_false.reshape(-1,1), x_valid_false), axis=1).astype(np.float32)
print("self.train_np.shape, ", self.train_np.shape)
print("self.valid_true_np.shape, ", self.valid_true_np.shape)
print("self.valid_false_np.shape, ", self.valid_false_np.shape)
print("np.max(x_train), ", np.max(x_train))
print("np.min(x_train), ", np.min(x_train))
self.valid_data = np.concatenate((self.valid_true_np, self.valid_false_np))
random.seed(self.seed)
np.random.seed(self.seed)
def read_DATASET(self, train_path, true_path, false_path):
train_list = os.listdir(train_path)
y_train = np.ones(len(train_list))
x_train = np.empty((0, self.img_width*self.img_height))
for img in train_list:
path_name = train_path+img
x_img = Image.open(path_name)
#Alignez la taille
x_img = x_img.resize((self.img_width, self.img_height))
#Convertir 3 canaux en 1 canal
x_img= x_img.convert('L')
# PIL.Image.De l'image au tableau numpy
x_img = np.array(x_img)
#Normalisation
x_img = x_img / 255.0
#Ajouter un axe
x_img = x_img.reshape((1,self.img_width, self.img_height))
# flatten
x_img = x_img.reshape(1, self.img_width*self.img_height)
x_train = np.concatenate([x_train, x_img], axis = 0)
print("x_train.shape, ", x_train.shape)
print("y_train.shape, ", y_train.shape)
test_true_list = os.listdir(true_path)
y_test_true = np.ones(len(test_true_list))
x_test_true = np.empty((0, self.img_width*self.img_height))
for img in test_true_list:
path_name = true_path+img
x_img = Image.open(path_name)
x_img = x_img.resize((self.img_width, self.img_height))
x_img= x_img.convert('L')
x_img = np.array(x_img)
x_img = x_img / 255.0
x_img = x_img.reshape((1,self.img_width, self.img_height))
x_img = x_img.reshape(1, self.img_width*self.img_height)
x_test_true = np.concatenate([x_test_true, x_img], axis = 0)
print("x_test_true.shape, ", x_test_true.shape)
print("y_test_true.shape, ", y_test_true.shape)
test_false_list = os.listdir(false_path)
y_test_false = np.zeros(len(test_false_list))
x_test_false = np.empty((0, self.img_width*self.img_height))
for img in test_false_list:
path_name = false_path+img
x_img = Image.open(path_name)
x_img = x_img.resize((self.img_width, self.img_height))
x_img= x_img.convert('L')
x_img = np.array(x_img)
x_img = x_img / 255.0
x_img = x_img.reshape((1,self.img_width, self.img_height))
x_img = x_img.reshape(1, self.img_width*self.img_height)
x_test_false = np.concatenate([x_test_false, x_img], axis = 0)
print("x_test_false.shape, ", x_test_false.shape)
print("y_test_false.shape, ", y_test_false.shape)
return x_train, x_test_true, x_test_false, y_train, y_test_true, y_test_false
def get_file_names(self, dir_name):
target_files = []
for root, dirs, files in os.walk(dir_name):
targets = [os.path.join(root, f) for f in files]
target_files.extend(targets)
return target_files
def read_data(self, d_y_np, width, height):
tars = []
images = []
for num, d_y_1 in enumerate(d_y_np):
image = d_y_1[1:].reshape(width, height, 1)
tar = d_y_1[0]
images.append(image)
tars.append(tar)
return np.asarray(images), np.asarray(tars)
def normalize_data(self, data):
# data0_2 = data / 127.5
# data_norm = data0_2 - 1.0
data_norm = (data * 2.0) - 1.0 #applied for tanh
return data_norm
def make_data_for_1_epoch(self):
self.filename_1_epoch = np.random.permutation(self.train_np)
return len(self.filename_1_epoch)
def get_data_for_1_batch(self, i, batchsize):
filename_batch = self.filename_1_epoch[i:i + batchsize]
images, _ = self.read_data(filename_batch, self.img_width, self.img_height)
images_n = self.normalize_data(images)
return images_n
def get_valid_data_for_1_batch(self, i, batchsize):
filename_batch = self.valid_data[i:i + batchsize]
images, tars = self.read_data(filename_batch, self.img_width, self.img_height)
images_n = self.normalize_data(images)
return images_n, tars
def make_random_z_with_norm(self, mean, stddev, data_num, unit_num):
norms = np.random.normal(mean, stddev, (data_num, unit_num))
# tars = np.zeros((data_num, 1), dtype=np.float32)
return norms
def make_target_1_0(self, value, data_num):
if value == 0.0:
target = np.zeros((data_num, 1), dtype=np.float32)
elif value == 1.0:
target = np.ones((data_num, 1), dtype=np.float32)
else:
print("target value error")
return target
utility.py
import numpy as np
# import os
from PIL import Image
import matplotlib.pyplot as plt
import sklearn.metrics as sm
import csv
import seaborn as sns
def compute_precision_recall(score_A_np, ):
array_1 = np.where(score_A_np[:, 1] == 1.0)
array_0 = np.where(score_A_np[:, 1] == 0.0)
mean_1 = np.mean((score_A_np[array_1])[:, 0])
mean_0 = np.mean((score_A_np[array_0])[:, 0])
medium = (mean_1 + mean_0) / 2.0
print("mean_positive_score, ", mean_1)
print("mean_negative_score, ", mean_0)
print("score_threshold(pos_neg middle), ", medium)
np.save('./score_threshold.npy', medium)
array_upper = np.where(score_A_np[:, 0] >= medium)[0]
array_lower = np.where(score_A_np[:, 0] < medium)[0]
#print(array_upper)
print("negative_predict_num, ", array_upper.shape)
print("positive_predict_num, ", array_lower.shape)
array_1_tf = np.where(score_A_np[:, 1] == 1.0)[0]
array_0_tf = np.where(score_A_np[:, 1] == 0.0)[0]
#print(array_1_tf)
print("negative_fact_num, ", array_0_tf.shape)
print("positive_fact_num, ", array_1_tf.shape)
tn = len(set(array_lower)&set(array_1_tf))
tp = len(set(array_upper)&set(array_0_tf))
fp = len(set(array_lower)&set(array_0_tf))
fn = len(set(array_upper)&set(array_1_tf))
precision = tp / (tp + fp + 0.00001)
recall = tp / (tp + fn + 0.00001)
return tp, fp, tn, fn, precision, recall
def score_divide(score_A_np):
array_1 = np.where(score_A_np[:, 1] == 1.0)[0]
array_0 = np.where(score_A_np[:, 1] == 0.0)[0]
print("positive_predict_num, ", array_1.shape)
print("negative_predict_num, ", array_0.shape)
array_1_np = score_A_np[array_1][:, 0]
array_0_np = score_A_np[array_0][:, 0]
#print(array_1_np)
#print(array_0_np)
return array_1_np, array_0_np
def save_graph(x, y, filename, epoch):
plt.figure(figsize=(7, 5))
plt.plot(x, y)
plt.title('ROC curve ' + filename + ' epoch:' + str(epoch))
# x axis label
plt.xlabel("FP / (FP + TN)")
# y axis label
plt.ylabel("TP / (TP + FN)")
# save
plt.savefig(filename + '_ROC_curve_epoch' + str(epoch) +'.png')
plt.close()
def make_ROC_graph(score_A_np, filename, epoch):
argsort = np.argsort(score_A_np, axis=0)[:, 0]
value_1_0 = score_A_np[argsort][::-1].astype(np.float32)
#value_1_0 = (np.where(score_A_np_sort[:, 1] == 7., 1., 0.)).astype(np.float32)
# score_A_np_sort_0_1 = np.concatenate((score_A_np_sort, value_1_0), axis=1)
sum_1 = np.sum(value_1_0)
len_s = len(score_A_np)
sum_0 = len_s - sum_1
tp = np.cumsum(value_1_0[:, 1]).astype(np.float32)
index = np.arange(1, len_s + 1, 1).astype(np.float32)
fp = index - tp
fn = sum_1 - tp
tn = sum_0 - fp
tp_ratio = tp / (tp + fn + 0.00001)
fp_ratio = fp / (fp + tn + 0.00001)
save_graph(fp_ratio, tp_ratio, filename, epoch)
auc = sm.auc(fp_ratio, tp_ratio)
return auc
def unnorm_img(img_np):
img_np_255 = (img_np + 1.0) * 127.5
img_np_255_mod1 = np.maximum(img_np_255, 0)
img_np_255_mod1 = np.minimum(img_np_255_mod1, 255)
img_np_uint8 = img_np_255_mod1.astype(np.uint8)
return img_np_uint8
def convert_np2pil(images_255):
list_images_PIL = []
for num, images_255_1 in enumerate(images_255):
# img_255_tile = np.tile(images_255_1, (1, 1, 3))
image_1_PIL = Image.fromarray(images_255_1)
list_images_PIL.append(image_1_PIL)
return list_images_PIL
def make_score_hist(score_a_1, score_a_0, epoch, LOGFILE_NAME, OUT_HIST_DIR):
list_1 = score_a_1.tolist()
list_0 = score_a_0.tolist()
#print(list_1)
#print(list_0)
plt.figure(figsize=(7, 5))
plt.title("Histgram of Score")
plt.xlabel("Score")
plt.ylabel("freq")
plt.hist(list_1, bins=40, alpha=0.3, histtype='stepfilled', color='r', label="1")
plt.hist(list_0, bins=40, alpha=0.3, histtype='stepfilled', color='b', label='0')
plt.legend(loc=1)
plt.savefig(OUT_HIST_DIR + "/resultScoreHist_"+ LOGFILE_NAME + '_' + str(epoch) + ".png ")
plt.show()
def make_score_hist_test(score_a_1, score_a_0, score_th, LOGFILE_NAME, OUT_HIST_DIR):
list_1 = score_a_1.tolist()
list_0 = score_a_0.tolist()
#print(list_1)
#print(list_0)
plt.figure(figsize=(7, 5))
plt.title("Histgram of Score")
plt.xlabel("Score")
plt.ylabel("freq")
plt.hist(list_1, bins=40, alpha=0.3, histtype='stepfilled', color='r', label="1")
plt.hist(list_0, bins=40, alpha=0.3, histtype='stepfilled', color='b', label='0')
plt.legend(loc=1)
plt.savefig(OUT_HIST_DIR + "/resultScoreHist_"+ LOGFILE_NAME + "_test.png ")
plt.show()
def make_score_bar(score_a):
score_a = score_a.tolist()
list_images_PIL = []
for score in score_a:
x="score"
plt.bar(x,score,label=score)
fig, ax = plt.subplots(figsize=(1, 1))
ax.bar(x,score,label=round(score,3))
ax.legend(loc='center', fontsize=12)
fig.canvas.draw()
#im = np.array(fig.canvas.renderer.buffer_rgba()) #matplotlib vaut 3.Après 1
im = np.array(fig.canvas.renderer._renderer)
image_1_PIL = Image.fromarray(im)
list_images_PIL.append(image_1_PIL)
return list_images_PIL
def make_score_bar_predict(score_A_np_tmp):
score_a = score_A_np_tmp.tolist()
list_images_PIL = []
for score in score_a:
x="score"
#plt.bar(x,score[0],label=score)
fig, ax = plt.subplots(figsize=(1, 1))
if score[1]==0:
ax.bar(x,score[0], color='red',label=round(score[0],3))
else:
ax.bar(x,score[0], color='blue',label=round(score[0],3))
ax.legend(loc='center', fontsize=12)
fig.canvas.draw()
#im = np.array(fig.canvas.renderer.buffer_rgba()) #matplotlib vaut 3.Après 1
im = np.array(fig.canvas.renderer._renderer)
image_1_PIL = Image.fromarray(im)
list_images_PIL.append(image_1_PIL)
return list_images_PIL
def make_output_img(img_batch_1, img_batch_0, x_z_x_1, x_z_x_0, score_a_0, score_a_1, epoch, log_file_name, out_img_dir):
(data_num, img1_h, img1_w, _) = img_batch_1.shape
img_batch_1_unn = np.tile(unnorm_img(img_batch_1), (1, 1, 3))
img_batch_0_unn = np.tile(unnorm_img(img_batch_0), (1, 1, 3))
x_z_x_1_unn = np.tile(unnorm_img(x_z_x_1), (1, 1, 3))
x_z_x_0_unn = np.tile(unnorm_img(x_z_x_0), (1, 1, 3))
diff_1 = img_batch_1 - x_z_x_1
diff_1_r = (2.0 * np.maximum(diff_1, 0.0)) - 1.0 #(0.0, 1.0) -> (-1.0, 1.0)
diff_1_b = (2.0 * np.abs(np.minimum(diff_1, 0.0))) - 1.0 #(-1.0, 0.0) -> (1.0, 0.0) -> (1.0, -1.0)
diff_1_g = diff_1_b * 0.0 - 1.0
diff_1_r_unnorm = unnorm_img(diff_1_r)
diff_1_b_unnorm = unnorm_img(diff_1_b)
diff_1_g_unnorm = unnorm_img(diff_1_g)
diff_1_np = np.concatenate((diff_1_r_unnorm, diff_1_g_unnorm, diff_1_b_unnorm), axis=3)
diff_0 = img_batch_0 - x_z_x_0
diff_0_r = (2.0 * np.maximum(diff_0, 0.0)) - 1.0 #(0.0, 1.0) -> (-1.0, 1.0)
diff_0_b = (2.0 * np.abs(np.minimum(diff_0, 0.0))) - 1.0 #(-1.0, 0.0) -> (1.0, 0.0) -> (1.0, -1.0)
diff_0_g = diff_0_b * 0.0 - 1.0
diff_0_r_unnorm = unnorm_img(diff_0_r)
diff_0_b_unnorm = unnorm_img(diff_0_b)
diff_0_g_unnorm = unnorm_img(diff_0_g)
diff_0_np = np.concatenate((diff_0_r_unnorm, diff_0_g_unnorm, diff_0_b_unnorm), axis=3)
img_batch_1_PIL = convert_np2pil(img_batch_1_unn)
img_batch_0_PIL = convert_np2pil(img_batch_0_unn)
x_z_x_1_PIL = convert_np2pil(x_z_x_1_unn)
x_z_x_0_PIL = convert_np2pil(x_z_x_0_unn)
diff_1_PIL = convert_np2pil(diff_1_np)
diff_0_PIL = convert_np2pil(diff_0_np)
score_a_1_PIL = make_score_bar(score_a_1)
score_a_0_PIL = make_score_bar(score_a_0)
wide_image_np = np.ones(((img1_h + 1) * data_num - 1, (img1_w + 1) * 8 - 1, 3), dtype=np.uint8) * 255
wide_image_PIL = Image.fromarray(wide_image_np)
for num, (ori_1, ori_0, xzx1, xzx0, diff1, diff0, score_1, score_0) in enumerate(zip(img_batch_1_PIL, img_batch_0_PIL ,x_z_x_1_PIL, x_z_x_0_PIL, diff_1_PIL, diff_0_PIL, score_a_1_PIL, score_a_0_PIL)):
wide_image_PIL.paste(ori_1, (0, num * (img1_h + 1)))
wide_image_PIL.paste(xzx1, (img1_w + 1, num * (img1_h + 1)))
wide_image_PIL.paste(diff1, ((img1_w + 1) * 2, num * (img1_h + 1)))
wide_image_PIL.paste(score_1, ((img1_w + 1) * 3, num * (img1_h + 1)))
wide_image_PIL.paste(ori_0, ((img1_w + 1) * 4, num * (img1_h + 1)))
wide_image_PIL.paste(xzx0, ((img1_w + 1) * 5, num * (img1_h + 1)))
wide_image_PIL.paste(diff0, ((img1_w + 1) * 6, num * (img1_h + 1)))
wide_image_PIL.paste(score_0, ((img1_w + 1) * 7, num * (img1_h + 1)))
wide_image_PIL.save(out_img_dir + "/resultImage_"+ log_file_name + '_' + str(epoch) + ".png ")
def make_output_img_test(img_batch_test, x_z_x_test, score_A_np_tmp, log_file_name, out_img_dir):
(data_num, img1_h, img1_w, _) = img_batch_test.shape
img_batch_test_unn = np.tile(unnorm_img(img_batch_test), (1, 1, 3))
x_z_x_test_unn = np.tile(unnorm_img(x_z_x_test), (1, 1, 3))
diff_test = img_batch_test - x_z_x_test
diff_test_r = (2.0 * np.maximum(diff_test, 0.0)) - 1.0 #(0.0, 1.0) -> (-1.0, 1.0)
diff_test_b = (2.0 * np.abs(np.minimum(diff_test, 0.0))) - 1.0 #(-1.0, 0.0) -> (1.0, 0.0) -> (1.0, -1.0)
diff_test_g = diff_test_b * 0.0 - 1.0
diff_test_r_unnorm = unnorm_img(diff_test_r)
diff_test_b_unnorm = unnorm_img(diff_test_b)
diff_test_g_unnorm = unnorm_img(diff_test_g)
diff_test_np = np.concatenate((diff_test_r_unnorm, diff_test_g_unnorm, diff_test_b_unnorm), axis=3)
img_batch_test_PIL = convert_np2pil(img_batch_test_unn)
x_z_x_test_PIL = convert_np2pil(x_z_x_test_unn)
diff_test_PIL = convert_np2pil(diff_test_np)
score_a = score_A_np_tmp[:, 1:]
#tars = score_A_np_tmp[:, 0]
score_a_PIL = make_score_bar_predict(score_A_np_tmp)
wide_image_np = np.ones(((img1_h + 1) * data_num - 1, (img1_w + 1) * 8 - 1, 3), dtype=np.uint8) * 255
wide_image_PIL = Image.fromarray(wide_image_np)
for num, (ori_test, xzx_test, diff_test, score_test) in enumerate(zip(img_batch_test_PIL, x_z_x_test_PIL, diff_test_PIL, score_a_PIL)):
wide_image_PIL.paste(ori_test, (0, num * (img1_h + 1)))
wide_image_PIL.paste(xzx_test, (img1_w + 1, num * (img1_h + 1)))
wide_image_PIL.paste(diff_test, ((img1_w + 1) * 2, num * (img1_h + 1)))
wide_image_PIL.paste(score_test, ((img1_w + 1) * 3, num * (img1_h + 1)))
wide_image_PIL.save(out_img_dir + "/resultImage_"+ log_file_name + "_test.png ")
def save_list_to_csv(list, filename):
f = open(filename, 'w')
writer = csv.writer(f, lineterminator='\n')
writer.writerows(list)
f.close()
predict_BiGAN.py
import numpy as np
import os
import tensorflow as tf
import utility as Utility
import argparse
import matplotlib.pyplot as plt
from model_BiGAN import BiGAN as Model
from make_datasets_predict import Make_datasets_predict as Make_datasets
def parser():
parser = argparse.ArgumentParser(description='train LSGAN')
parser.add_argument('--batch_size', '-b', type=int, default=300, help='Number of images in each mini-batch')
parser.add_argument('--log_file_name', '-lf', type=str, default='anpanman', help='log file name')
parser.add_argument('--epoch', '-e', type=int, default=1, help='epoch')
#parser.add_argument('--file_train_data', '-ftd', type=str, default='./mnist.npz', help='train data')
#parser.add_argument('--test_true_data', '-ttd', type=str, default='./mnist.npz', help='test of true_data')
#parser.add_argument('--test_false_data', '-tfd', type=str, default='./mnist.npz', help='test of false_data')
parser.add_argument('--test_data', '-td', type=str, default='../Test_Data/200112/', help='test of false_data')
parser.add_argument('--valid_span', '-vs', type=int, default=1, help='validation span')
parser.add_argument('--score_th', '-st', type=float, default=np.load('./score_threshold.npy'), help='validation span')
return parser.parse_args()
args = parser()
#global variants
BATCH_SIZE = args.batch_size
LOGFILE_NAME = args.log_file_name
EPOCH = args.epoch
#FILE_NAME = args.file_train_data
#TRUE_DATA = args.test_true_data
#FALSE_DATA = args.test_false_data
TEST_DATA = args.test_data
IMG_WIDTH = 100
IMG_HEIGHT = 100
IMG_CHANNEL = 1
BASE_CHANNEL = 32
NOISE_UNIT_NUM = 200
NOISE_MEAN = 0.0
NOISE_STDDEV = 1.0
TEST_DATA_SAMPLE = 5 * 5
L2_NORM = 0.001
KEEP_PROB_RATE = 0.5
SEED = 1234
SCORE_ALPHA = 0.9 # using for cost function
VALID_SPAN = args.valid_span
np.random.seed(seed=SEED)
BOARD_DIR_NAME = './tensorboard/' + LOGFILE_NAME
OUT_IMG_DIR = './out_images_BiGAN' #output image file
out_model_dir = './out_models_BiGAN/' #output model_ckpt file
#Load_model_dir = '../model_ckpt/' #Load model_ckpt file
OUT_HIST_DIR = './out_score_hist_BiGAN' #output histogram file
CYCLE_LAMBDA = 1.0
SCORE_TH = args.score_th
make_datasets = Make_datasets(TEST_DATA, IMG_WIDTH, IMG_HEIGHT, SEED)
model = Model(NOISE_UNIT_NUM, IMG_CHANNEL, SEED, BASE_CHANNEL, KEEP_PROB_RATE)
z_ = tf.placeholder(tf.float32, [None, NOISE_UNIT_NUM], name='z_') #noise to generator
x_ = tf.placeholder(tf.float32, [None, IMG_HEIGHT, IMG_WIDTH, IMG_CHANNEL], name='x_') #image to classifier
d_dis_f_ = tf.placeholder(tf.float32, [None, 1], name='d_dis_g_') #target of discriminator related to generator
d_dis_r_ = tf.placeholder(tf.float32, [None, 1], name='d_dis_r_') #target of discriminator related to real image
is_training_ = tf.placeholder(tf.bool, name = 'is_training')
with tf.variable_scope('encoder_model'):
z_enc = model.encoder(x_, reuse=False, is_training=is_training_)
with tf.variable_scope('decoder_model'):
x_dec = model.decoder(z_, reuse=False, is_training=is_training_)
x_z_x = model.decoder(z_enc, reuse=True, is_training=is_training_) # for cycle consistency
with tf.variable_scope('discriminator_model'):
#stream around discriminator
drop3_r, logits_r = model.discriminator(x_, z_enc, reuse=False, is_training=is_training_) #real pair
drop3_f, logits_f = model.discriminator(x_dec, z_, reuse=True, is_training=is_training_) #real pair
drop3_re, logits_re = model.discriminator(x_z_x, z_enc, reuse=True, is_training=is_training_) #fake pair
with tf.name_scope("loss"):
loss_dis_f = tf.reduce_mean(tf.square(logits_f - d_dis_f_), name='Loss_dis_gen') #loss related to generator
loss_dis_r = tf.reduce_mean(tf.square(logits_r - d_dis_r_), name='Loss_dis_rea') #loss related to real image
#total loss
loss_dis_total = loss_dis_f + loss_dis_r
loss_dec_total = loss_dis_f
loss_enc_total = loss_dis_r
with tf.name_scope("score"):
l_g = tf.reduce_mean(tf.abs(x_ - x_z_x), axis=(1,2,3))
l_FM = tf.reduce_mean(tf.abs(drop3_r - drop3_re), axis=1)
score_A = SCORE_ALPHA * l_g + (1.0 - SCORE_ALPHA) * l_FM
with tf.name_scope("optional_loss"):
loss_dec_opt = loss_dec_total + CYCLE_LAMBDA * l_g
loss_enc_opt = loss_enc_total + CYCLE_LAMBDA * l_g
tf.summary.scalar('loss_dis_total', loss_dis_total)
tf.summary.scalar('loss_dec_total', loss_dec_total)
tf.summary.scalar('loss_enc_total', loss_enc_total)
merged = tf.summary.merge_all()
# t_vars = tf.trainable_variables()
dec_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="decoder")
enc_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="encoder")
dis_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="discriminator")
with tf.name_scope("train"):
train_dis = tf.train.AdamOptimizer(learning_rate=0.00005, beta1=0.5).minimize(loss_dis_total, var_list=dis_vars
, name='Adam_dis')
train_dec = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_dec_total, var_list=dec_vars
, name='Adam_dec')
train_enc = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_enc_total, var_list=enc_vars
, name='Adam_enc')
train_dec_opt = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_dec_opt, var_list=dec_vars
, name='Adam_dec')
train_enc_opt = tf.train.AdamOptimizer(learning_rate=0.005, beta1=0.5).minimize(loss_enc_opt, var_list=enc_vars
, name='Adam_enc')
sess = tf.Session()
ckpt = tf.train.get_checkpoint_state(out_model_dir)
saver = tf.train.Saver()
if ckpt: #S'il y a un point de contrôle
last_model = ckpt.model_checkpoint_path #Chemin vers le dernier modèle enregistré
saver.restore(sess, last_model) #Lecture de données variables
print("load " + last_model)
else: #Lorsqu'il n'y a pas de données enregistrées
#init = tf.initialize_all_variables()
sess.run(tf.global_variables_initializer())
summary_writer = tf.summary.FileWriter(BOARD_DIR_NAME, sess.graph)
log_list = []
log_list.append(['epoch', 'AUC'])
#training loop
for epoch in range(1):
if epoch % VALID_SPAN == 0:
score_A_np = np.zeros((0, 2), dtype=np.float32)
val_data_num = len(make_datasets.valid_data)
img_batch_test = make_datasets.get_valid_data_for_1_batch(0, val_data_num)
score_A_ = sess.run(score_A, feed_dict={x_:img_batch_test, is_training_:False})
score_A_re = np.reshape(score_A_, (-1, 1))
tars_batch_re = np.where(score_A_re < SCORE_TH, 1, 0) #np.reshape(tars_batch, (-1, 1))
score_A_np_tmp = np.concatenate((score_A_re, tars_batch_re), axis=1)
x_z_x_test = sess.run(x_z_x, feed_dict={x_:img_batch_test, is_training_:False})
#print(score_A_np_tmp)
array_1_np, array_0_np = Utility.score_divide(score_A_np_tmp)
Utility.make_score_hist_test(array_1_np, array_0_np, SCORE_TH, LOGFILE_NAME, OUT_HIST_DIR)
Utility.make_output_img_test(img_batch_test, x_z_x_test, score_A_np_tmp, LOGFILE_NAME, OUT_IMG_DIR)
make_datasets_predict.py
import numpy as np
import os
import glob
import re
import random
#import cv2
from PIL import Image
from keras.preprocessing import image
class Make_datasets_predict():
def __init__(self, test_data, img_width, img_height, seed):
self.filename = test_data
self.img_width = img_width
self.img_height = img_height
self.seed = seed
x_test = self.read_DATASET(self.filename)
self.valid_data = x_test
random.seed(self.seed)
np.random.seed(self.seed)
def read_DATASET(self, test_path):
test_list = os.listdir(test_path)
x_test = np.empty((0, self.img_width*self.img_height))
for img in test_list:
path_name = test_path+img
x_img = Image.open(path_name)
#Alignez la taille
x_img = x_img.resize((self.img_width, self.img_height))
#Convertir 3 canaux en 1 canal
x_img= x_img.convert('L')
# PIL.Image.De l'image au tableau numpy
x_img = np.array(x_img)
#Normalisation
x_img = x_img / 255.0
#Ajouter un axe
x_img = x_img.reshape((1,self.img_width, self.img_height))
# flatten
x_img = x_img.reshape(1, self.img_width*self.img_height)
x_test = np.concatenate([x_test, x_img], axis = 0)
print("x_test.shape, ", x_test.shape)
return x_test
def get_file_names(self, dir_name):
target_files = []
for root, dirs, files in os.walk(dir_name):
targets = [os.path.join(root, f) for f in files]
target_files.extend(targets)
return target_files
def divide_MNIST_by_digit(self, train_np, data1_num, data2_num):
data_1 = train_np[train_np[:,0] == data1_num]
data_2 = train_np[train_np[:,0] == data2_num]
return data_1, data_2
def read_data(self, d_y_np, width, height):
#tars = []
images = []
for num, d_y_1 in enumerate(d_y_np):
image = d_y_1.reshape(width, height, 1)
#tar = d_y_1[0]
images.append(image)
#tars.append(tar)
return np.asarray(images)#, np.asarray(tars)
def normalize_data(self, data):
# data0_2 = data / 127.5
# data_norm = data0_2 - 1.0
data_norm = (data * 2.0) - 1.0 #applied for tanh
return data_norm
def make_data_for_1_epoch(self):
self.filename_1_epoch = np.random.permutation(self.train_np)
return len(self.filename_1_epoch)
def get_data_for_1_batch(self, i, batchsize):
filename_batch = self.filename_1_epoch[i:i + batchsize]
images, _ = self.read_data(filename_batch, self.img_width, self.img_height)
images_n = self.normalize_data(images)
return images_n
def get_valid_data_for_1_batch(self, i, batchsize):
filename_batch = self.valid_data[i:i + batchsize]
images = self.read_data(filename_batch, self.img_width, self.img_height)
images_n = self.normalize_data(images)
return images_n#, tars
def make_random_z_with_norm(self, mean, stddev, data_num, unit_num):
norms = np.random.normal(mean, stddev, (data_num, unit_num))
# tars = np.zeros((data_num, 1), dtype=np.float32)
return norms
def make_target_1_0(self, value, data_num):
if value == 0.0:
target = np.zeros((data_num, 1), dtype=np.float32)
elif value == 1.0:
target = np.ones((data_num, 1), dtype=np.float32)
else:
print("target value error")
return target
https://github.com/YousukeAnai/Dic_Graduation_Assignment
https://qiita.com/masataka46/items/49dba2790fa59c29126b https://qiita.com/underfitting/items/a0cbb035568dea33b2d7
Recommended Posts