Continuing from the previous article .
Using ResNet, a type of convolutional neural network, I would like to classify images.
ResNet is a type of convolutional neural network algorithm that I did in the previous article. It was announced by Microsoft in 2015, and was later used in a Go AI called AlphaGO, which defeated the world champion of Go.
Using such a great algorithm, I would like to classify images as in the previous article.
The more convolutional neural networks you have, the easier it is to recognize complex features. However, overfitting is also likely to occur, and increasing it often worsens the results. ResNet addresses this issue by adding a shortcut structure called Residual Blocks.
A structure that creates a detour route called a shortcut connection before the convolution process and moves to the next layer without performing the convolution process when learning is no longer necessary.
In regression prediction, I created a process to interrupt even if the specified number of epochs is not reached if no improvement is seen during learning with Early Stopping, but it is an image to do it in the convolution layer.
ResNet has two architectures, Plain and Bottleneck.
This time, we will use an architecture called Bottleneck to make classification predictions.
The residual block by the Bottleneck architecture has the following structure.
If the first time is the number of kernels x and the kernel size y
The second time, the number of kernels x, kernel size 3y
The third time is the number of kernels 4x, kernel size y
Since tensorflow uses 1.13.1, please install it with the following command.
!pip install tensorflow==1.13.1
Once installed, import the library.
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.layers import Activation, Add, BatchNormalization, Conv2D, Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
The data uses the same cifar10 as the previous CNN.
(train_images, train_labels), (test_images, test_labels) = cifar10.load_data()
Because this model has complicated branches and multiple outputs It is built with a network structure called Functional API, not Sequentinal.
FunctionalAPI Instead of adding layers to the model itself with model.add like Sequentinal, We will create it by passing the created layer to the next layer and finally passing it to the model.
Specifically, write as follows. __ * Since this is an example of Functional API, the following code itself is not used in this program. __
input = Input(Shape=(784,)) #Input layer
x = Conv2D(64, (3, 3), activation='relu', padding='same')(input) #Add the input layer to the end and pass it
x = MaxPool2D(pool_size=(2, 2))(x) #Previous layer at the end(x)Add and pass
model = Model(inputs=input, outputs=x) #Modeling
The Sequentinal model was a method of adding layers such as Conv2D in model.add, In FunctionalAPI, pass it in () at the end of the next layer instead of model.add.
This time, we will use the convolution layer repeatedly, so we will make it a function. Let's create a function called def conv.
def conv(filters, kernel_size, strides=1):
return Conv2D(filters, kernel_size, strides=strides, padding='same', use_bias=False,
kernel_initializer='he_normal', kernel_regularizer=l2(0.0001))
filters: number of kernels kernel_size: kernel size strides: Number of strides (numerical value of how much to set when creating a feature map) strides: padding method use_bias: Whether to bias kernel_initializer: Initial value of the weight matrix. If he_normal, it is a normal distribution. kernel_regularizer: A regularization method for penalties applied to weights. Use L2 regularization.
This time, I will create three types of residual blocks with different kernel numbers. I would like to process each residual block 18 times.
When processing 18 times, the processing contents are slightly different between the first residual block and the second and subsequent residual blocks. For the sake of clarity, I would like to separate the first residual block from the second and subsequent residual blocks.
#Residual block to be executed the first time
def first_residual_unit(filters, strides):
def f(x):
# →BN→ReLU
x = BatchNormalization()(x)
b = Activation('relu')(x)
#Convolution layer → BN → ReLU
x = conv(filters // 4, 1, strides)(b)
x = BatchNormalization()(x)
x = Activation('relu')(x)
#Convolution layer → BN → ReLU
x = conv(filters // 4, 3)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
#Convolution layer →
x = conv(filters, 1)(x)
#Adjust the shape size of the shortcut
sc = conv(filters, 1, strides)(b)
# Add
return Add()([x, sc])
return f
I will explain the code of the residual block.
x = BatchNormalization () (x)
is a process that optimizes learning.
There are two patterns of timing to use.
The pattern to be executed at the very beginning of the residual block, It is a pattern that sandwiches between the convolution layer and the activation function. It is said that it is better not to use it together with Dropout, so I am not using Dropout this time.
Then apply the activation function with b = Activation ('relu') (x)
.
Next we go to the convolution layer, but if we didn't need to learn at this point, we'll jump to Add.
This is the shortcut structure described above.
From x = conv (filters // 4, 1, strides) (b)
, it becomes a convolution layer.
The quotient is filters (kernel size) divided by 4.
This is due to the use of the Bottleneck architecture.
You can see that the third convolution layer is processed without breaking.
__ If the first time is the number of kernels x and the kernel size y
The second time, the number of kernels x, kernel size 3y
The third time, the number of kernels is 4x, the kernel size is y__
In sc = conv (filters, 1, strides) (b)
Adjusted the shortcut to the same size as x.
Finally, return with return, and the first residual block is finished.
Next is the residual block to be executed from the second time onward.
Strides is not required for the argument. Adjusting the shape number of the shortcut is unnecessary because it has already been performed in the previous layer. Other processing is the same as the first time.
#Residual block to be executed from the second time onward
def residual_unit(filters):
def f(x):
sc = x
# →BN→ReLU
x = BatchNormalization()(x)
x = Activation('relu')(x)
#Convolution layer → BN → ReLU
x = conv(filters // 4, 1)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
#Convolution layer → BN → ReLU
x = conv(filters // 4, 3)(x)
x = BatchNormalization()(x)
x = Activation('relu')(x)
#Convolution layer →
x = conv(filters, 1)(x)
# Add
return Add()([x, sc])
return f
Now that we have a residual block creation function that can be executed only the first time and the second and subsequent times, Combine these two to create the specified number of residual block creation functions.
#A function that executes a specified number of residual blocks
def residual_block(filters, strides, unit_size):
"""
filters:Number of kernels
strides:Number of strides
unit_size:Residual block execution count
"""
def f(x):
#Residual block to be executed the first time
x = first_residual_unit(filters, strides)(x)
#Residual block after the second time(I've already run it once, so unit_Subtract 1 from size)
for i in range(unit_size-1):
x = residual_unit(filters)(x)
return x
return f
Now that the preparations are complete, let's create the model.
Create according to the following flow.
Since the output of GlobalAveragePooling2D is one-dimensional, There is no need to convert to one dimension with Flatten before full join.
#Input data shape (input layer)
input = Input(shape=(32,32, 3))
#Convolution layer(pass input)
x = conv(16, 3)(input)
#Residual block 18 x 3
x = residual_block(64, 1, 18)(x)
x = residual_block(128, 2, 18)(x)
x = residual_block(256, 2, 18)(x)
# →BN→ReLU
x = BatchNormalization()(x)
x = Activation('relu')(x)
#Pooling layer
x = GlobalAveragePooling2D()(x)
#Fully connected layer
output = Dense(10, activation='softmax', kernel_regularizer=l2(0.0001))(x)
#Creating a model
model = Model(inputs=input, outputs=output)
If you want to convert to TPU model, the following code (* This is the code for tensorflow 1.13.1.)
import tensorflow as tf
import os
tpu_model = tf.contrib.tpu.keras_to_tpu_model(
model,
strategy=tf.contrib.tpu.TPUDistributionStrategy(
tf.contrib.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
)
)
tpu_model.compile(loss='categorical_crossentropy', optimizer=SGD(momentum=0.9), metrics=['acc'])
Normalize the image data using ImageDataGenerator and Inflate the image. (Inflating is training data only)
After normalization, use fit to calculate the statistic.
Since there are 50,000 images, do I need to inflate them? You might think, but since there are only 5,000 per type, It is better to inflate it.
#Training data normalization and padding
train_gen = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True,
width_shift_range=0.125,
height_shift_range=0.125,
horizontal_flip=True)
#Normalization process of test data (Inflating is meaningless, so do not)
test_gen = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True)
#Pre-calculate statistics for the entire dataset
for data in (train_gen, test_gen):
data.fit(train_images)
Now that we have created the model and preprocessed the data, we can start training.
In learning this time, using LearningRateScheduler I would like to change the learning rate for each number of epochs.
Learning rate 0.1 from 1 to 79 80 ~ 119 has a learning rate of 0.01, 120 or more can be reduced to 0.001 for each number of times. You can shorten the learning time.
#Preparing for LearningRateScheduler
def step_decay(epoch):
x = 0.1
if epoch >= 80: x = 0.01
if epoch >= 120: x = 0.001
return x
lr_decay = LearningRateScheduler(step_decay)
Perform learning. This time we are inflating the image, so There are a few options.
#Learning
batch_size = 128
history = tpu_model.fit_generator(
train_gen.flow(train_images, train_labels, batch_size=batch_size),
epochs=100,
steps_per_epoch=train_images.shape[0] // batch_size,
validation_data=test_gen.flow(test_images, test_labels, batch_size=batch_size),
validation_steps=test_images.shape[0] // batch_size,
callbacks=[lr_decay])
This study will take about an hour. When the learning is completed, the learning result is displayed in a graph.
plt.plot(history.history['acc'], label='acc')
plt.plot(history.history['val_acc'], label='val_acc')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(loc='best')
plt.show()
Let's evaluate learning with test_images. In the previous CNN, the correct answer rate was 71.2%. What about Resnet?
batch_size = 128
test_loss, test_acc = tpu_model.evaluate_generator(
test_gen.flow(test_images, test_labels, batch_size=batch_size),
steps=10)
print('loss: {:.3f}\nacc: {:.3f}'.format(test_loss, test_acc ))
The result rate is 93.4%! Dramatically improved over traditional CNN.
Let's check the prediction result of each image just in case.
#Display of inferred image
for i in range(16):
plt.subplot(2, 8, i+1)
plt.imshow(test_images[i])
plt.show()
#Display the inferred label
test_predictions = tpu_model.predict_generator(
test_gen.flow(test_images[0:16], shuffle = False, batch_size=16),
steps=16)
test_predictions = np.argmax(test_predictions, axis=1)[0:16]
labels = ['airplane', 'automobile', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
print([labels[n] for n in test_predictions])
The model on Google colaboratory, Download it to your local environment with the code below.
#Save model
tpu_model.save('resnet.h5')
#Download the model to your local environment
from google.colab import files
files.download( "resnet.h5" )
It's been long, but that's it.
Recommended Posts