[PYTHON] Verstärktes Lernen: Beschleunigen Sie die Wertiteration

Einführung

In den letzten Jahren wurde aufgrund des Erfolgs von AlphaGo und DQN das Gebiet des modellfreien Lernens zur Tiefenverstärkung aktiv untersucht. Diese Algorithmen sind einer der effektiven Ansätze, wenn der Zustandsaktionsraum unter Umständen groß ist oder wenn die mathematische Modellierung der Dynamik schwierig ist. Unter den Problemen, die in der Realität auftreten, ist es jedoch relativ einfach, die Umgebung mathematisch zu modellieren, und es gibt viele Fälle, in denen der Zustandsaktionsraum durch Entwickeln reduziert werden kann. Für solche Probleme denke ich, dass die Verwendung eines modellbasierten Lernens zur Stärkung der Tabelle einen großen Vorteil in Bezug auf Entwicklungs- und Betriebskosten hat.

Die Größe des staatlichen Aktionsraums, der beim Lernen zur Stärkung der Tabelle verwendet werden kann, hängt jedoch stark von der Geschwindigkeit des Programms ab, und die Beschleunigung ist sehr wichtig. Daher werden wir in diesem Artikel das Know-how für die schnelle Ausführung der __Wert-Iterationsmethode __ vorstellen, die der grundlegende Algorithmus für das Verstärkungslernen ist. Am Ende konnten wir __500 mal schneller __ erreichen als die naive Implementierung.

Hintergrund

Markov Entscheidungsprozess

Markov Decision Processes (MDP) ist ein Framework, das bei der Problemstellung des verstärkenden Lernens verwendet wird. Die "Umgebung" nimmt jedes Mal einen Zustand $ s $ an, und der "Agent" der Entscheidungsfindung wählt willkürlich die Aktion $ a $ aus, die in diesem Zustand verfügbar ist. Danach wechselt die Umgebung zufällig in einen neuen Status. Zu diesem Zeitpunkt erhält der Agent eine Belohnung von $ r $, die dem Statusübergang entspricht. Die grundlegende Problemeinstellung in MDP besteht darin, ein Maß zu finden, das eine Zuordnung (Wahrscheinlichkeitsverteilung) zu der optimalen Aktion darstellt, die ein Agent in einem bestimmten Zustand ausführt. Wenn dann die Zielfunktion die kumulative Rabattbelohnung ist, ist das Problem des Findens der optimalen Wertfunktion wie folgt.

\pi^* = \text{arg}\max_{\pi}  \text{E}_{s \sim \mu, \tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s] = \text{arg}\max_{\pi}  \text{E}_{s \sim \mu} [V_{\pi}(s)]

Die Zustandswertfunktion $ V (s) $ und die Aktionswertfunktion $ Q (s, a) $ sind wie folgt definiert.

V_{\pi}(s) = \text{E}_{\tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s] \\
Q_{\pi}(s, a) = \text{E}_{\tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s, a_0=a]

Wertwiederholungsmethode

Optimale Richtlinie $ V ^ * (s) = V_ {\ pi ^ *} (s) $ für die Statuswertfunktion in $ \ pi ^ * $, $ Q ^ * (s, a) = Q_ {\ für die Aktionswertfunktion Definiert als pi ^ *} (s, a) $. Aus der Definition der Wertfunktion können wir erkennen, dass die optimale Wertfunktion die folgende Bellman-Gleichung erfüllt.

Q^*(s, a) = \sum_{s', r} p(s', r|s, a) [r + \gamma V^*(s')] \\
V^*(s) = \max _a Q^*(s, a)

Die Wertiteration ist ein Algorithmus, der mit einem geeigneten Anfangswert beginnt und die Belman-Gleichung wiederholt anpasst, um $ V $ und $ Q $ abwechselnd zu aktualisieren. Der schlechteste Berechnungsbetrag ist ein Polypoly für die Anzahl der Zustände und die Anzahl der Aktionen. Wenn die Aktionswertfunktion $ Q ^ * (s, a) $ erhalten werden kann, kann das optimale Maß erhalten werden, indem die Aktion $ a $ ausgewählt wird, die im Zustand $ s $ ausgeführt werden kann und den höchsten Aktionswert aufweist. Ich kann es schaffen

\pi(a|s) = \text{arg}\max _a Q^*(s,a)

Algorithmus

Versuchsvorbereitung

Die Wertiterationsmethode ist eine sehr einfache Idee, es gibt jedoch viele mögliche Implementierungen. Hier möchte ich einige Implementierungen und deren Verarbeitungsgeschwindigkeiten im Detail vorstellen. Erstellen Sie ein experimentelles MDP zum Vergleich jeder Implementierung. Betrachten Sie zur Vereinfachung der Implementierung ein deterministisches MDP, eine Welt, in der die Belohnung $ r $ und der nächste Zustand $ s '$ für eine Aktion $ a $ deterministisch bestimmt werden. Zu diesem Zeitpunkt wird die Belman-Gleichung wie folgt vereinfacht.

Q^*(s, a) = r + \gamma V^*(s')

Das deterministische MDP kann durch ein Diagramm dargestellt werden, in dem der Zustand ein Knoten, die Aktion eine Kante und die Belohnung ein Kantengewicht (Attribut) ist. Die folgende Funktion erstellt das MDP, das im Experiment verwendet werden soll.

import networkx as nx
import random

def create_mdp(num_states, num_actions, reward_ratio=0.01, neighbor_count=30):
    get_reward = lambda: 1.0 if random.random() < reward_ratio else 0.0
    get_neighbor = lambda u: random.randint(u - neighbor_count, u + neighbor_count) % (num_states - 1)
    edges = [
        (i, (i + 1) % (num_states - 1), get_reward())
        for i in range(num_states)
    ]
    for _ in range(num_states * (num_actions - 1)):
        u = random.randint(0, num_states - 1)
        v = get_neighbor(u)
        r = get_reward()
        edges.append((u, v, r))
    G = nx.DiGraph()
    G.add_weighted_edges_from(edges)
    return G

Die Anzahl der Zustände und die Anzahl der Aktionen (die durchschnittliche Anzahl der Aktionen in jedem Zustand) werden angegeben, und ein zufälliger, stark verbundener Graph und eine spärliche Belohnung werden erzeugt. Es wird durch DiGraph dargestellt, wobei der Knoten von networkx der Status ist, die Kante die Aktion ist und das Gewichtsattribut der Kante die Belohnung ist.

In zukünftigen Experimenten werden wir MDP mit 10000 Zuständen und durchschnittlich 3 Aktionen für jeden Zustand verwenden.

num_states = 10000
num_actions = 3
G = create_mdp(num_states, num_actions)

Implementierung einer naiven Wertiterationsmethode

Der einfachste Algorithmus ist eine Methode zum Wiederholen von "Aktualisieren des Statuswerts aller Zustände" und "Aktualisieren des Aktionswerts aller Aktionen" und wird manchmal als synchrone dynamische Programmierung bezeichnet.

class NonConvergenceError(Exception):
    pass

class SyncDP:
    
    def __init__(self, G, gamma, max_sweeps, threshold):
        self.G = G
        self.gamma = gamma
        self.max_sweeps = max_sweeps
        self.threshold = threshold
        self.V = {state : 0 for state in G.nodes}
        self.TD = {state : 0 for state in G.nodes}
        self.Q = {(state, action) : 0 for state, action in G.edges}

    def get_reward(self, s, a):
        return self.G.edges[s, a]['weight']

    def sweep(self):
        for state in self.G.nodes:
            for action in self.G.successors(state):
                self.Q[state, action] = self.get_reward(state, action) + self.gamma * self.V[action]
        for state in self.G.nodes:
            v_new = max([self.Q[state, action] for action in self.G.successors(state)])
            self.TD[state] = abs(self.V[state] - v_new)
            self.V[state] = v_new

    def run(self):
        for _ in range(self.max_sweeps):
            self.sweep()
            if (np.array(list(self.TD.values())) < self.threshold).all():
                return self.V
        raise NonConvergenceError

Die Parameter Abzinsungsrate Gamma, Konvergenzschwellenschwelle und maximale Anzahl von Sweeps werden ursprünglich durch die Anforderungen der Anwendung bestimmt, stellen hier jedoch geeignete Werte ein.

gamma = 0.95
threshold = 0.01
max_sweeps = 1000

Bei Ausführung unter dieser Bedingung ist die Verarbeitungszeit wie folgt.

%timeit V = SyncDP(G, gamma, max_sweeps, threshold).run()
8.83 s ± 273 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Wenn die Anzahl der Zustände 10000 beträgt und dies so lange dauert, sind die Anwendungen, die verwendet werden können, wahrscheinlich recht begrenzt. Diese Verarbeitungszeit wird die Basis für zukünftige Verbesserungen sein.

Asynchronous Dynamic Programming (Async DP) Da der Algorithmus zur Aktualisierung synchroner Werte alle Zustände in einem Durchlauf aktualisiert, kann ein einzelner Durchlauf eine beträchtliche Zeit in Anspruch nehmen, wenn die Anzahl der Zustände sehr groß ist. Asynchroner DP aktualisiert iterativ den Statuswert der Impression. Mit anderen Worten, anstatt ein neues Array vorzubereiten, um jedes Mal aktualisierte Werte wie synchrones DP zu speichern und alle Statuswerte zu aktualisieren und sie im neuen Array zu speichern, berechnen Sie bis dahin mit jeder Aktualisierung. Wiederholen Sie die Wertaktualisierungen und nutzen Sie dabei die verschiedenen verfügbaren Statuswerte. Es ist notwendig, alle Zustände ständig zu aktualisieren, um die Konvergenz sicherzustellen, aber die Aktualisierungsreihenfolge kann frei gewählt werden. Sie können beispielsweise Wertaktualisierungen beschleunigen, indem Sie Bedingungen überspringen, die für die beste Richtlinie weniger relevant sind.

Prioritized Sweeping Bei asynchronem DP kann die Reihenfolge der Wertaktualisierungen beliebig sein. Beim Aktualisieren des Werts sind nicht alle Zustände gleichermaßen nützlich, um den Wert anderer Zustände zu aktualisieren, und es wird erwartet, dass einige Zustände einen signifikanten Einfluss auf den Wert anderer Zustände haben. In MDP, wo Sie beispielsweise die spärliche Belohnung erhalten, über die Sie nachdenken, ist es wichtig, den belohnten Zustand effizient auf andere Zustände zu übertragen. Daher kann der folgende Algorithmus unter Verwendung einer Prioritätswarteschlange in Betracht gezogen werden.

  1. Verwalten Sie den Änderungsbetrag aufgrund der Wertaktualisierung in allen Status mit einer Prioritätswarteschlange
  2. Aktualisieren Sie den Wert des Status oben in der Warteschlange
  3. Wenn der Änderungsbetrag seit der letzten Wertaktualisierung den Schwellenwert überschreitet, verschieben Sie das Paar aus Status und Änderungsbetrag in die Warteschlange.

Dieser Algorithmus wird als priorisiertes Sweeping bezeichnet. Die Implementierung sieht folgendermaßen aus:

import heapq

class PrioritizedDP(SyncDP):
    def run(self):
        self.sweep()
        pq = [
            (-abs(td_error), state)
            for state, td_error in self.TD.items()
            if abs(td_error) > self.threshold
            ]
        heapq.heapify(pq)
        while pq:
            _, action = heapq.heappop(pq)
            if self.TD[action] < self.threshold:
                continue
            self.TD[action] = 0
            for state in self.G.predecessors(action):
                self.Q[state, action] = self.get_reward(state, action) + self.gamma * self.V[action]
                v_new = max([self.Q[state, action] for action in self.G.successors(state)])
                td_error = abs(v_new - self.V[state])
                self.TD[state] += td_error
                if td_error > self.threshold:
                    heapq.heappush(pq, (-td_error, state))
                self.V[state] = v_new
        return self.V

Zunächst wird die Sweep-Funktion verwendet, um den Wert aller Zustände zu aktualisieren, und der Heap wird basierend auf dem Zustand erstellt, in dem der TD-Fehler den Schwellenwert überschreitet. Wiederholen Sie danach die Aktualisierung, bis die Warteschlange erschöpft ist. Aktualisieren Sie zunächst den Aktionswert $ Q $, sodass der aus der Warteschlange entnommene Status zum nächsten Status (= Aktion) wird. Aktualisieren Sie dann $ V $ für den Statuswert, der vom aktualisierten Aktionswert abhängt. Wenn der Unterschied zwischen vor und nach dem Update (td_error) den Schwellenwert überschreitet, wird er in die Warteschlange verschoben. Die Verarbeitungszeit ist wie folgt und wir konnten ungefähr die doppelte Geschwindigkeit erreichen.

%timeit V = PrioritizedDP(G, gamma, max_sweeps, threshold).run()
4.06 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Vektorberechnung

Bisher haben wir objektbasierte Networkx-Algorithmen implementiert, aber häufig genannte Methoden wie Nachfolger nehmen die meiste Zeit in Anspruch. Lassen Sie uns überlegen, wie Sie die Vektorberechnung nach Numpy verwenden und gleichzeitig die Datenstruktur effizienter gestalten können. Durch Ausdrücken des Diagramms als Numpy-Array wird es möglich, die Vektorberechnung für die Aktionswertberechnung wie folgt zu verwenden.

class ArraySyncDP:
    
    def __init__(self, A : ArrayGraph, gamma, max_sweeps, threshold):
        self.A = A
        self.gamma = gamma
        self.max_sweeps = max_sweeps
        self.threshold = threshold
        self.V = np.zeros(A.num_states)
        self.TD = np.full(A.num_states, np.inf)
        self.Q = np.zeros(A.num_actions)

    def run(self):
        for _ in range(self.max_sweeps):
            self.Q[:] = self.A.reward + self.gamma * self.V[self.A.action2next_state]
            for state_id in range(self.A.num_states):
                start, end = self.A.state2action_start[state_id], self.A.state2action_start[state_id + 1]
                v_new = self.Q[start : end].max()
                self.TD[state_id] = abs(self.V[state_id] - v_new)
                self.V[state_id] = v_new

            if (self.TD < self.threshold).all():
                return self.V
        raise NonConvergenceError
%timeit V = ArraySyncDP(A, gamma, max_sweeps, threshold).run()
3.5 s ± 99.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Es ist etwas schneller als Priorized Sweeping, aber nicht sehr effektiv. Es scheint, dass der Hauptgrund darin besteht, dass wir nicht alles in Vektoren konvertieren konnten und immer noch Minuten verwenden, um den Statuswert zu aktualisieren.

Cython Von Natur aus halten Arrays ihre grundlegenden Datentypen in zusammenhängenden Speicherbereichen, sodass sie schneller zugänglich sein sollten als Listen, die auf im Speicher verstreute Objekte verweisen. Python scheint jedoch langsamer auf Elemente zuzugreifen als Listen und Wörterbücher, da es in Python-Objekte konvertiert wird, um auf einzelne Elemente des Arrays zu verweisen, was zu Overhead führt.

Denken wir also über die Verwendung von Cython nach, um den Array-Zugriff zu beschleunigen. Cython ist ein Compiler, der typbeschriftetes Python in eine kompilierte Erweiterung konvertiert. Das konvertierte Erweiterungsmodul kann wie ein normales Python-Modul durch Import geladen werden. Wenn Sie Cython verwenden, können Sie den Prozess des Elementzugriffs mithilfe des numpy-Arrays als Schnittstelle aus den folgenden Gründen beschleunigen.

Implementieren Sie asynchrones DP in Cython. Ich möchte eine Prioritätswarteschlange verwenden, überspringe jedoch aufgrund der Verarbeitung von Python-Objekten nur Statuswertaktualisierungen mit einem kleinen TD-Fehler.

%%cython
import numpy as np
cimport numpy as np
cimport cython

ctypedef np.float64_t FLOAT_t
ctypedef np.int64_t INT_t

@cython.boundscheck(False)
@cython.wraparound(False)
def cythonic_backup(
    FLOAT_t[:] V, FLOAT_t[:] Q, FLOAT_t[:] TD, FLOAT_t[:] reward,
    INT_t[:] state2action_start, INT_t[:] action2next_state,
    INT_t[:] next_state2inv_action, INT_t[:, :] inv_action2state_action,
    FLOAT_t gamma, FLOAT_t threshold
):
    cdef INT_t num_updates, state, action, next_state, inv_action, start_inv_action, end_inv_action, start_action, end_action
    cdef FLOAT_t v
    num_updates = 0
    for next_state in range(len(V)):
        if TD[next_state] < threshold:
            continue
            
        num_updates += 1
        TD[next_state] = 0
        start_inv_action = next_state2inv_action[next_state]
        end_inv_action = next_state2inv_action[next_state + 1]
        for inv_action in range(start_inv_action, end_inv_action):
            state = inv_action2state_action[inv_action][0]
            action = inv_action2state_action[inv_action][1]
            Q[action] = reward[action] + gamma * V[next_state]
            start_action = state2action_start[state]
            end_action = state2action_start[state + 1]
            v = -1e9
            for action in range(start_action, end_action):
                if v < Q[action]:
                    v = Q[action]
            if v > V[state]:
                TD[state] += v - V[state]
            else:
                TD[state] += V[state] - v
            V[state] = v
    return num_updates
class CythonicAsyncDP(ArraySyncDP):
    def run(self):
        A = self.A
        for i in range(self.max_sweeps):
            num_updates = cythonic_backup(
                self.V, self.Q, self.TD, A.reward, A.state2action_start, A.action2next_state,
                A.next_state2inv_action, A.inv_action2state_action, self.gamma, self.threshold
            )
            if num_updates == 0:
                return self.V
        raise NonConvergenceError
%timeit V = CythonicAsyncDP(A, gamma, max_sweeps, threshold).run()
18.6 ms ± 947 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Eine dramatische Beschleunigung von 18,6 ms wurde erreicht. Mit dieser Verarbeitungsgeschwindigkeit können je nach Verwendungszweck auch Probleme mit etwa 1 Million Staaten angemessen behandelt werden.

Verweise

[1] Hochleistungs-Python, Micha Gorelick und Ian Ozsvald, O'Reilly Japan, 2015. [2] Reinforcement Learning, R. S. Sutton and A. G. Barto, The MIT Press, 2018.

Recommended Posts

Verstärktes Lernen: Beschleunigen Sie die Wertiteration
Zukünftiges Verstärkungslernen_2
Zukünftiges Verstärkungslernen_1
Erweitertes Lernen 1 Python-Installation
Stärkung des Lernens 3 OpenAI-Installation
Stärkung des Lernens der dritten Zeile
[Lernen stärken] Banditenaufgabe
Python + Unity Enhanced Learning (Lernen)
Stärkung des Lernens 1 Einführungsausgabe
Ich habe Value Iteration Networks ausprobiert
Verbessertes Lernen 7 Protokollieren Sie die Ausgabe von Lerndaten
Stärkung des Lernens 28 colaboratory + OpenAI + chainerRL
Stärkung des Lernens 19 Colaboratory + Mountain_car + ChainerRL
Stärkung des Lernens 2 Installation von Chainerrl
[Lernen stärken] Tracking durch Multi-Agent
Stärkung des Lernens 6 First Chainer RL
Verbessertes Lernen ab Python
Stärkung des Lernens 20 Colaboratory + Pendulum + ChainerRL
Verstärkungslernen 5 Versuchen Sie, CartPole zu programmieren?
Verstärkungslernen 9 ChainerRL Magic Remodeling
Lernen stärken Lernen Sie von heute
Stärkung des Lernens 4 CartPole erster Schritt
Tiefe Stärkung des Lernens 1 Einführung in die Stärkung des Lernens
DeepMind Enhanced Learning Framework Acme
Stärkung des Lernens 21 Labor + Pendel + ChainerRL + A2C
TF2RL: Erweiterte Lernbibliothek für TensorFlow2.x
Verstärkungslernen 34 Erstellen Sie fortlaufende Agentenvideos
Lernen stärken 13 Probieren Sie Mountain_car mit ChainerRL aus.
Python + Unity Verbesserte Erstellung von Lernumgebungen
Stärkung des Lernens 22 Colaboratory + CartPole + ChainerRL + A3C
Entdecken Sie das Labyrinth mit erweitertem Lernen
Stärkung des Lernens 8 Versuchen Sie, die Chainer-Benutzeroberfläche zu verwenden
Stärkung des Lernens 24 Colaboratory + CartPole + ChainerRL + ACER
Verstärkungslernen 3 Dynamische Planungsmethode / TD-Methode
Deep Strengthing Learning 3 Praktische Ausgabe: Block Breaking
Ich habe versucht, mit PyBrain verstärkt zu lernen
Lerne beim Machen! Tiefes Verstärkungslernen_1