[PYTHON] Challenge DQN (Modoki) with Chainer ✕ OpenAI Gym!

Introduction

I made a program to solve the Classic control of OpenAI Gym with Deep Q Network, commonly known as DQN, which combines deep learning and reinforcement learning. This time I would like to introduce the implementation.

About DQN itself

-History of DQN + Deep Q-Network written in Chainer -Reinforcement learning from zero to deep

The article is very easy to understand, and I implemented it with reference to the papers and GitHub code introduced here. If you want to know the theory of reinforcement learning and DQN, please refer to here.

DQN "Modoki"?

As you can see from the name Deep Q Network, DQN functionally approximates Q-learning, which is one of reinforcement learning, with a multi-layer neural network. In addition to that, it seems that it can be called DQN only after incorporating the following three methods.

  1. Experience Replay
  2. Fixed Target Q-Network
  3. Reward Clipping

The method I implemented this time was only 1 and 2, and I did not clip the reward of 3. So to be precise, it's not DQN. So I chose DQN "Modoki".

About OpenAI Gym

An open source platform that makes it easy to build an environment for reinforcement learning. It is a python library and

$ pip install gym

Easy to install with. For details, please see the Official Site.

Implementation

Now, I would like to introduce the implemented code. Some of the code shown here is omitted, so please check the whole from here.

neural network

I implemented it using Chainer. 100 units are 3 layers, and the activation function is Leaky ReLU.

class Neuralnet(Chain):

    def __init__(self, n_in, n_out):
        super(Neuralnet, self).__init__(
            L1 = L.Linear(n_in, 100),
            L2 = L.Linear(100, 100),
            L3 = L.Linear(100, 100),
            Q_value = L.Linear(100, n_out, initialW=np.zeros((n_out, 100), dtype=np.float32))
        )

    def Q_func(self, x):
        h = F.leaky_relu(self.L1(x))
        h = F.leaky_relu(self.L2(h))
        h = F.leaky_relu(self.L3(h))
        h = self.Q_value(h)
        return h

Agent

Implementation of the agent part of reinforcement learning.

parameter settings

It defines the parameters for reinforcement learning. The neural network introduced earlier is also defined according to the number of read states and the number of actions. And for Fixed Target Q-Network, make a deep copy of the created Q function. In other words, there are two Q functions. At first, I had a hard time understanding this part ...

class Agent():

    def __init__(self, n_st, n_act, seed):
        self.n_act = n_act
        self.model = Neuralnet(n_st, n_act)
        self.target_model = copy.deepcopy(self.model)
        self.optimizer = optimizers.Adam()
        self.optimizer.setup(self.model)
        self.memory = deque()
        self.loss = 0
        self.step = 0
 self.gamma = 0.99 # discount rate
 self.mem_size = 1000 #Experience Number of experiences to remember for Replay
 self.batch_size = 100 #Mini-batch size for Experience Replay
 self.train_freq = 10 # Neural network learning interval
 self.target_update_freq = 20 # Target network synchronization interval
        # ε-greedy
 self.epsilon = 1 # initial value of ε
 self.epsilon_decay = 0.005 # Attenuation value of ε
 self.epsilon_min = 0 # Minimum value of ε
 self.exploration = 1000 # Number of steps to start decaying ε (this time until memory is accumulated)

Accumulation of experience

For Experience Replay

  1. State: st
  2. Action: act
  3. Reward: r
  4. Next state: st_dash
  5. End of episode: ep_end

The five elements of are tupled as experience and stored in memory. If it exceeds the memory size defined at the beginning, it will be discarded in a tokoroten style from the one put in first. At first, memory was just a list, but I heard that there is a deque that can append and pop at both ends with constraints, so I used that.

def stock_experience(self, st, act, r, st_dash, ep_end):
    self.memory.append((st, act, r, st_dash, ep_end))
    if len(self.memory) > self.mem_size:
        self.memory.popleft()

Experience Replay

This is the implementation part of Experience Replay, which is one of the important techniques in DQN. Shuffle the stored memory, cut it out in the defined mini-batch size, and learn.

def suffle_memory(self):
    mem = np.array(self.memory)
    return np.random.permutation(mem)

def parse_batch(self, batch):
    st, act, r, st_dash, ep_end = [], [], [], [], []
    for i in xrange(self.batch_size):
        st.append(batch[i][0])
        act.append(batch[i][1])
        r.append(batch[i][2])
        st_dash.append(batch[i][3])
        ep_end.append(batch[i][4])
    st = np.array(st, dtype=np.float32)
    act = np.array(act, dtype=np.int8)
    r = np.array(r, dtype=np.float32)
    st_dash = np.array(st_dash, dtype=np.float32)
    ep_end = np.array(ep_end, dtype=np.bool)
    return st, act, r, st_dash, ep_end

def experience_replay(self):
    mem = self.suffle_memory()
    perm = np.array(xrange(len(mem)))
    for start in perm[::self.batch_size]:
        index = perm[start:start+self.batch_size]
        batch = mem[index]
        st, act, r, st_d, ep_end = self.parse_batch(batch)
        self.model.zerograds()
        loss = self.forward(st, act, r, st_d, ep_end)
        loss.backward()
        self.optimizer.update()

Q function update part

This is the update part of the Q function using the neural network. It is important to use the copied Q function (self.target_model.Q_func) in the part that calculates the maximum Q value in the next state (st_dash).

def forward(self, st, act, r, st_dash, ep_end):
    s = Variable(st)
    s_dash = Variable(st_dash)
    Q = self.model.Q_func(s)
    tmp = self.target_model.Q_func(s_dash)
    tmp = list(map(np.max, tmp.data))
    max_Q_dash = np.asanyarray(tmp, dtype=np.float32)
    target = np.asanyarray(copy.deepcopy(Q.data), dtype=np.float32)
    for i in xrange(self.batch_size):
        target[i, act[i]] = r[i] + (self.gamma * max_Q_dash[i]) * (not ep_end[i])
    loss = F.mean_squared_error(Q, Variable(target))
    return loss

When calculating loss here, it seems that learning will be faster if the difference between the Q value and target is clipped to -1 to 1, but I could not implement it because I could not understand the theory due to lack of study (I'm sorry ...

Return action

It is a part that returns the action to be taken when it is input according to the learned Q function. The method of action selection uses ε-greedy.

def get_action(self, st):
    if np.random.rand() < self.epsilon:
        return np.random.randint(0, self.n_act)
    else:
        s = Variable(st)
        Q = self.model.Q_func(s)
        Q = Q.data[0]
        a = np.argmax(Q)
        return np.asarray(a, dtype=np.int8)

Advance learning

This is the part where learning proceeds when enough memory is accumulated. The step is ticked every time, and the Q function for target is synchronized at regular intervals. Also, after completing the search to some extent, ε will be attenuated at each step.

def reduce_epsilon(self):
    if self.epsilon > self.epsilon_min and self.exploration < self.step:
        self.epsilon -= self.epsilon_decay

def train(self):
    if len(self.memory) >= self.mem_size:
        if self.step % self.train_freq == 0:
            self.experience_replay()
            self.reduce_epsilon()
        if self.step % self.target_update_freq == 0:
            self.target_model = copy.deepcopy(self.model)
    self.step += 1

Execution part

I tried to make it so that if you enter the environment name of Classic control, it will judge the number of states and the number of actions without permission It may have become a little messy and difficult to understand ^^;

def main(env_name):
    env = gym.make(env_name)
    view_path = "./video/" + env_name

    n_st = env.observation_space.shape[0]
    if type(env.action_space) == gym.spaces.discrete.Discrete:
        # CartPole-v0, Acrobot-v0, MountainCar-v0
        n_act = env.action_space.n
        action_list = range(0, n_act)
    elif type(env.action_space) == gym.spaces.box.Box:
        # Pendulum-v0
        action_list = [np.array([a]) for a in [-2.0, 2.0]]
        n_act = len(action_list)

    agent = Agent(n_st, n_act, seed)

    env.monitor.start(view_path, video_callable=None, force=True, seed=seed)
    for i_episode in xrange(1000):
        observation = env.reset()
        for t in xrange(200):
            env.render()
            state = observation.astype(np.float32).reshape((1,n_st))
            act_i = agent.get_action(state)
            action = action_list[act_i]
            observation, reward, ep_end, _ = env.step(action)
            state_dash = observation.astype(np.float32).reshape((1,n_st))
            agent.stock_experience(state, act_i, reward, state_dash, ep_end)
            agent.train()
            if ep_end:
                break
    env.monitor.close()

Execution result

I uploaded the results to OpenAI Gym

I think Acrobot and Pendulum are pretty good results, but Cart Pole is subtle. It seems that the results will vary depending on the update frequency of the Q function for target, the magnitude of attenuation of ε, and the optimization method. interesting!

in conclusion

I want to try it with Atari games in the future. At that time, it seems necessary to consider the reward Clipping. Should I consider normalization and DropOut?

Recommended Posts

Challenge DQN (Modoki) with Chainer ✕ OpenAI Gym!
Solve OpenAI Gym Copy-v0 with Sarsa
OpenAI Gym to learn with PD-controlled Cart Pole
Seq2Seq (1) with chainer
Create an OpenAI Gym environment with bash on Windows 10
Use tensorboard with Chainer
Reinforcement learning in the shortest time with Keras with OpenAI Gym