[PYTHON] Reinforcement learning: Accelerate Value Iteration

Introduction

In recent years, the field of model-free deep reinforcement learning has been actively studied due to the success of AlphaGo and DQN. These algorithms are one of the effective approaches when the state-action space is large in circumstances or when mathematical modeling of dynamics is difficult. However, among the problems that are encountered in reality, it is relatively easy to mathematically model the environment, and there are many cases where the state-action space can be reduced by devising. For such problems, I think that using model-based table reinforcement learning has a great advantage in terms of development and operation costs.

However, the size of the state action space that can be handled in table reinforcement learning largely depends on the speed of the program, and speeding up is very important. Therefore, in this article, we will introduce the know-how for executing the __value iterative method __, which is the basic algorithm of reinforcement learning, at high speed. In the end, we were able to achieve __500 times faster __ than the naive implementation.

background

Markov decision process

Markov Decision Processes (MDP) is a framework used in reinforcement learning problem setting. The "environment" takes a state $ s $ at each time, and the decision-making "agent" arbitrarily selects the action $ a $ available in that state. The environment then randomly transitions to a new state, at which time the agent receives a reward of $ r $ corresponding to the state transition. The basic problem setting in MDP is to find a measure that is a mapping (probability distribution) to the optimal action taken by an agent in a certain state. Then, if the objective function is a discounted cumulative reward, the problem of finding the optimal value function is as follows.

\pi^* = \text{arg}\max_{\pi}  \text{E}_{s \sim \mu, \tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s] = \text{arg}\max_{\pi}  \text{E}_{s \sim \mu} [V_{\pi}(s)]

The state value function $ V (s) $ and the action value function $ Q (s, a) $ are defined as follows.

V_{\pi}(s) = \text{E}_{\tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s] \\
Q_{\pi}(s, a) = \text{E}_{\tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s, a_0=a]

Value iterative method

Optimal policy $ V ^ * (s) = V_ {\ pi ^ *} (s) $ for the state value function in $ \ pi ^ * $, $ Q ^ * (s, a) = Q_ {\ for the action value function Define it as pi ^ *} (s, a) $. From the definition of the value function, we can see that the optimal value function satisfies the following Bellman equation.

Q^*(s, a) = \sum_{s', r} p(s', r|s, a) [r + \gamma V^*(s')] \\
V^*(s) = \max _a Q^*(s, a)

Value Iteration is an algorithm that starts with an appropriate initial value and repeatedly adapts the Bellman equation to update $ V $ and $ Q $ alternately. The worst computational complexity is a polynomial for the number of states and the number of actions. If the action value function $ Q ^ * (s, a) $ can be obtained, the optimum measure can be obtained by selecting the action $ a $ that can be taken in the state $ s $ and having the highest action value. I can do it.

\pi(a|s) = \text{arg}\max _a Q^*(s,a)

algorithm

Experiment preparation

The value iterative method is very simple as an idea, but there are many possible implementations. Here, I would like to introduce some implementations and their processing speeds in detail. Create an experimental MDP for comparison of each implementation. For ease of implementation, consider a deterministic MDP, a world in which the reward $ r $ and the next state $ s'$ are deterministically determined for an action $ a $. At this time, the Bellman equation is simplified as follows.

Q^*(s, a) = r + \gamma V^*(s')

Deterministic MDP can be represented by a graph with states as nodes, actions as edges, and rewards as edge weights (attributes). The following function creates the MDP to be used in the experiment.

import networkx as nx
import random

def create_mdp(num_states, num_actions, reward_ratio=0.01, neighbor_count=30):
    get_reward = lambda: 1.0 if random.random() < reward_ratio else 0.0
    get_neighbor = lambda u: random.randint(u - neighbor_count, u + neighbor_count) % (num_states - 1)
    edges = [
        (i, (i + 1) % (num_states - 1), get_reward())
        for i in range(num_states)
    ]
    for _ in range(num_states * (num_actions - 1)):
        u = random.randint(0, num_states - 1)
        v = get_neighbor(u)
        r = get_reward()
        edges.append((u, v, r))
    G = nx.DiGraph()
    G.add_weighted_edges_from(edges)
    return G

We specify the number of states and the number of actions (the average number of actions in each state), and generate a random strongly connected graph and sparse reward. It is represented by DiGraph where node of networkx is state, edge is action, and weight attribute of edge is reward.

In future experiments, we will use MDP with 10000 states and an average of 3 actions in each state.

num_states = 10000
num_actions = 3
G = create_mdp(num_states, num_actions)

Implementation of naive value iterative method

The simplest algorithm is a method of repeating "update of the state value of all states" and "update of the action value of all actions", and is sometimes called Synchronous Dynamic Programming.

class NonConvergenceError(Exception):
    pass

class SyncDP:
    
    def __init__(self, G, gamma, max_sweeps, threshold):
        self.G = G
        self.gamma = gamma
        self.max_sweeps = max_sweeps
        self.threshold = threshold
        self.V = {state : 0 for state in G.nodes}
        self.TD = {state : 0 for state in G.nodes}
        self.Q = {(state, action) : 0 for state, action in G.edges}

    def get_reward(self, s, a):
        return self.G.edges[s, a]['weight']

    def sweep(self):
        for state in self.G.nodes:
            for action in self.G.successors(state):
                self.Q[state, action] = self.get_reward(state, action) + self.gamma * self.V[action]
        for state in self.G.nodes:
            v_new = max([self.Q[state, action] for action in self.G.successors(state)])
            self.TD[state] = abs(self.V[state] - v_new)
            self.V[state] = v_new

    def run(self):
        for _ in range(self.max_sweeps):
            self.sweep()
            if (np.array(list(self.TD.values())) < self.threshold).all():
                return self.V
        raise NonConvergenceError

The parameters discount rate gamma, convergence threshold threshold, and maximum number of sweeps are originally determined by the requirements of the application, but here, set appropriate values.

gamma = 0.95
threshold = 0.01
max_sweeps = 1000

When executed under this condition, the processing time is as follows.

%timeit V = SyncDP(G, gamma, max_sweeps, threshold).run()
8.83 s ± 273 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

If the number of states is 10000 and it takes this much time, the applications that can be used are likely to be quite limited. This processing time will be the baseline for future improvements.

Asynchronous Dynamic Programming (Async DP) Since the synchronous value update algorithm updates all states in one sweep, a single sweep can take a considerable amount of time if the number of states is very large. Asynchronous DP iteratively updates the state value on the impression. In other words, instead of preparing a new array to store the updated values every time like synchronous DP and updating all state values and storing them in the new array, calculate by then with each update. Repeat the value update, taking advantage of the different state values available. It is necessary to keep updating all states in order to guarantee convergence, but the update order can be freely selected. For example, you can speed up value updates by skipping conditions that are less relevant to the best policy.

Prioritized Sweeping For asynchronous DP, the order of value updates can be arbitrary. When updating value, not all states are equally useful in updating the value of other states, and it is expected that some states will have a significant impact on the value of other states. For example, in an MDP that can obtain sparse rewards as we are thinking about, it is important to efficiently propagate the rewarded state to other states. Therefore, the following algorithm using the priority queue can be considered.

  1. Priority queue manages the amount of change due to value update in all states
  2. Update the value of the state at the top of the queue
  3. If the amount of change from the last value update exceeds the threshold, push the state / change amount pair to the queue.

This algorithm is called Prioritized Sweeping. The implementation looks like this:

import heapq

class PrioritizedDP(SyncDP):
    def run(self):
        self.sweep()
        pq = [
            (-abs(td_error), state)
            for state, td_error in self.TD.items()
            if abs(td_error) > self.threshold
            ]
        heapq.heapify(pq)
        while pq:
            _, action = heapq.heappop(pq)
            if self.TD[action] < self.threshold:
                continue
            self.TD[action] = 0
            for state in self.G.predecessors(action):
                self.Q[state, action] = self.get_reward(state, action) + self.gamma * self.V[action]
                v_new = max([self.Q[state, action] for action in self.G.successors(state)])
                td_error = abs(v_new - self.V[state])
                self.TD[state] += td_error
                if td_error > self.threshold:
                    heapq.heappush(pq, (-td_error, state))
                self.V[state] = v_new
        return self.V

First, the sweep function is used to update the value of all states, and the heap is constructed based on the state in which the TD error exceeds the threshold value. After that, repeat the update until the queue is exhausted. First, update the action value $ Q $ so that the state taken out of the queue becomes the next state (= action). Then update $ V $ for the state value that depends on the updated action value. If the difference between before and after update (td_error) exceeds the threshold, it will be pushed to the queue. The processing time is as follows, and we were able to achieve about twice the speed.

%timeit V = PrioritizedDP(G, gamma, max_sweeps, threshold).run()
4.06 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Vector calculation

So far we have implemented networkx object-based algorithms, but frequently called methods such as successors take most of the time. Let's think about how to utilize vector operation by numpy while making the data structure more efficient. By expressing the graph as a numpy array, it becomes possible to use vector operations for action value operations as follows.

class ArraySyncDP:
    
    def __init__(self, A : ArrayGraph, gamma, max_sweeps, threshold):
        self.A = A
        self.gamma = gamma
        self.max_sweeps = max_sweeps
        self.threshold = threshold
        self.V = np.zeros(A.num_states)
        self.TD = np.full(A.num_states, np.inf)
        self.Q = np.zeros(A.num_actions)

    def run(self):
        for _ in range(self.max_sweeps):
            self.Q[:] = self.A.reward + self.gamma * self.V[self.A.action2next_state]
            for state_id in range(self.A.num_states):
                start, end = self.A.state2action_start[state_id], self.A.state2action_start[state_id + 1]
                v_new = self.Q[start : end].max()
                self.TD[state_id] = abs(self.V[state_id] - v_new)
                self.V[state_id] = v_new

            if (self.TD < self.threshold).all():
                return self.V
        raise NonConvergenceError
%timeit V = ArraySyncDP(A, gamma, max_sweeps, threshold).run()
3.5 s ± 99.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

It's slightly faster than Prioritized Sweeping, but it's not very effective. It seems that the main reason is that not all can be vector-operated and the for minutes are still used for updating the state value.

Cython Arrays should be faster accessible because they keep their basic data types in a contiguous area of memory, compared to lists that refer to objects scattered in memory. However, Python seems to be slower to access elements than lists and dictionaries because it converts to Python objects to reference individual elements of the array, which incurs overhead.

So let's think about using Cython to speed up array access. Cython is a compiler that transforms type-annotated Python into compiled extension modules. The converted extension module can be loaded by import in the same way as a normal Python module. If you use Cython, it seems that you can speed up the process of element access using the numpy array as an interface for the following reasons.

Implement asynchronous DP in Cython. I'd like to use a priority queue, but I'm only skipping state value updates with a small TD error because of the processing of Python objects.

%%cython
import numpy as np
cimport numpy as np
cimport cython

ctypedef np.float64_t FLOAT_t
ctypedef np.int64_t INT_t

@cython.boundscheck(False)
@cython.wraparound(False)
def cythonic_backup(
    FLOAT_t[:] V, FLOAT_t[:] Q, FLOAT_t[:] TD, FLOAT_t[:] reward,
    INT_t[:] state2action_start, INT_t[:] action2next_state,
    INT_t[:] next_state2inv_action, INT_t[:, :] inv_action2state_action,
    FLOAT_t gamma, FLOAT_t threshold
):
    cdef INT_t num_updates, state, action, next_state, inv_action, start_inv_action, end_inv_action, start_action, end_action
    cdef FLOAT_t v
    num_updates = 0
    for next_state in range(len(V)):
        if TD[next_state] < threshold:
            continue
            
        num_updates += 1
        TD[next_state] = 0
        start_inv_action = next_state2inv_action[next_state]
        end_inv_action = next_state2inv_action[next_state + 1]
        for inv_action in range(start_inv_action, end_inv_action):
            state = inv_action2state_action[inv_action][0]
            action = inv_action2state_action[inv_action][1]
            Q[action] = reward[action] + gamma * V[next_state]
            start_action = state2action_start[state]
            end_action = state2action_start[state + 1]
            v = -1e9
            for action in range(start_action, end_action):
                if v < Q[action]:
                    v = Q[action]
            if v > V[state]:
                TD[state] += v - V[state]
            else:
                TD[state] += V[state] - v
            V[state] = v
    return num_updates
class CythonicAsyncDP(ArraySyncDP):
    def run(self):
        A = self.A
        for i in range(self.max_sweeps):
            num_updates = cythonic_backup(
                self.V, self.Q, self.TD, A.reward, A.state2action_start, A.action2next_state,
                A.next_state2inv_action, A.inv_action2state_action, self.gamma, self.threshold
            )
            if num_updates == 0:
                return self.V
        raise NonConvergenceError
%timeit V = CythonicAsyncDP(A, gamma, max_sweeps, threshold).run()
18.6 ms ± 947 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

A dramatic speedup of 18.6ms was achieved. With this processing speed, depending on the intended use, it is possible to handle problems with a number of states of about 1 million.

References

[1] High Performance Python, Micha Gorelick and Ian Ozsvald, O'Reilly Japan, 2015. [2] Reinforcement Learning, R. S. Sutton and A. G. Barto, The MIT Press, 2018.

Recommended Posts

Reinforcement learning: Accelerate Value Iteration
Future reinforcement learning_2
Future reinforcement learning_1
Reinforcement learning 1 Python installation
Reinforcement learning 3 OpenAI installation
Reinforcement learning for tic-tac-toe
[Reinforcement learning] Bandit task
Python + Unity Reinforcement Learning (Learning)
Reinforcement learning 1 introductory edition
I tried Value Iteration Networks
Reinforcement learning 7 Learning data log output
Play with reinforcement learning with MuZero
Reinforcement learning 28 colaboratory + OpenAI + chainerRL
Reinforcement learning 19 Colaboratory + Mountain_car + ChainerRL
Reinforcement learning 2 Installation of chainerrl
[Reinforcement learning] Tracking by multi-agent
Reinforcement learning 6 First Chainer RL
Reinforcement learning starting with Python
Reinforcement learning 20 Colaboratory + Pendulum + ChainerRL
Reinforcement learning 5 Try programming CartPole?
Reinforcement learning 9 ChainerRL magic remodeling
Reinforcement learning Learn from today
Reinforcement learning 4 CartPole first step
Deep Reinforcement Learning 1 Introduction to Reinforcement Learning
DeepMind Reinforcement Learning Framework Acme
Reinforcement learning 21 Colaboratory + Pendulum + ChainerRL + A2C
TF2RL: Reinforcement learning library for TensorFlow2.x
Reinforcement learning 34 Make continuous Agent videos
Reinforcement learning 13 Try Mountain_car with ChainerRL.
Python + Unity Reinforcement learning environment construction
Reinforcement learning 22 Colaboratory + CartPole + ChainerRL + A3C
Explore the maze with reinforcement learning
Reinforcement learning 8 Try using Chainer UI
Reinforcement learning 24 Colaboratory + CartPole + ChainerRL + ACER
Reinforcement learning 3 Dynamic programming / TD method
Deep Reinforcement Learning 3 Practical Edition: Breakout
I tried reinforcement learning using PyBrain
Learn while making! Deep reinforcement learning_1