In den letzten Jahren wurde aufgrund des Erfolgs von AlphaGo und DQN das Gebiet des modellfreien Lernens zur Tiefenverstärkung aktiv untersucht. Diese Algorithmen sind einer der effektiven Ansätze, wenn der Zustandsaktionsraum unter Umständen groß ist oder wenn die mathematische Modellierung der Dynamik schwierig ist. Unter den Problemen, die in der Realität auftreten, ist es jedoch relativ einfach, die Umgebung mathematisch zu modellieren, und es gibt viele Fälle, in denen der Zustandsaktionsraum durch Entwickeln reduziert werden kann. Für solche Probleme denke ich, dass die Verwendung eines modellbasierten Lernens zur Stärkung der Tabelle einen großen Vorteil in Bezug auf Entwicklungs- und Betriebskosten hat.
Die Größe des staatlichen Aktionsraums, der beim Lernen zur Stärkung der Tabelle verwendet werden kann, hängt jedoch stark von der Geschwindigkeit des Programms ab, und die Beschleunigung ist sehr wichtig. Daher werden wir in diesem Artikel das Know-how für die schnelle Ausführung der __Wert-Iterationsmethode __ vorstellen, die der grundlegende Algorithmus für das Verstärkungslernen ist. Am Ende konnten wir __500 mal schneller __ erreichen als die naive Implementierung.
Markov Decision Processes (MDP) ist ein Framework, das bei der Problemstellung des verstärkenden Lernens verwendet wird. Die "Umgebung" nimmt jedes Mal einen Zustand $ s $ an, und der "Agent" der Entscheidungsfindung wählt willkürlich die Aktion $ a $ aus, die in diesem Zustand verfügbar ist. Danach wechselt die Umgebung zufällig in einen neuen Status. Zu diesem Zeitpunkt erhält der Agent eine Belohnung von $ r $, die dem Statusübergang entspricht. Die grundlegende Problemeinstellung in MDP besteht darin, ein Maß zu finden, das eine Zuordnung (Wahrscheinlichkeitsverteilung) zu der optimalen Aktion darstellt, die ein Agent in einem bestimmten Zustand ausführt. Wenn dann die Zielfunktion die kumulative Rabattbelohnung ist, ist das Problem des Findens der optimalen Wertfunktion wie folgt.
\pi^* = \text{arg}\max_{\pi} \text{E}_{s \sim \mu, \tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s] = \text{arg}\max_{\pi} \text{E}_{s \sim \mu} [V_{\pi}(s)]
Die Zustandswertfunktion $ V (s) $ und die Aktionswertfunktion $ Q (s, a) $ sind wie folgt definiert.
V_{\pi}(s) = \text{E}_{\tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s] \\
Q_{\pi}(s, a) = \text{E}_{\tau \sim \pi}[\sum _t \gamma^t r(s_t, a_t)|s_0=s, a_0=a]
Optimale Richtlinie $ V ^ * (s) = V_ {\ pi ^ *} (s) $ für die Statuswertfunktion in $ \ pi ^ * $, $ Q ^ * (s, a) = Q_ {\ für die Aktionswertfunktion Definiert als pi ^ *} (s, a) $. Aus der Definition der Wertfunktion können wir erkennen, dass die optimale Wertfunktion die folgende Bellman-Gleichung erfüllt.
Q^*(s, a) = \sum_{s', r} p(s', r|s, a) [r + \gamma V^*(s')] \\
V^*(s) = \max _a Q^*(s, a)
Die Wertiteration ist ein Algorithmus, der mit einem geeigneten Anfangswert beginnt und die Belman-Gleichung wiederholt anpasst, um $ V $ und $ Q $ abwechselnd zu aktualisieren. Der schlechteste Berechnungsbetrag ist ein Polypoly für die Anzahl der Zustände und die Anzahl der Aktionen. Wenn die Aktionswertfunktion $ Q ^ * (s, a) $ erhalten werden kann, kann das optimale Maß erhalten werden, indem die Aktion $ a $ ausgewählt wird, die im Zustand $ s $ ausgeführt werden kann und den höchsten Aktionswert aufweist. Ich kann es schaffen
\pi(a|s) = \text{arg}\max _a Q^*(s,a)
Die Wertiterationsmethode ist eine sehr einfache Idee, es gibt jedoch viele mögliche Implementierungen. Hier möchte ich einige Implementierungen und deren Verarbeitungsgeschwindigkeiten im Detail vorstellen. Erstellen Sie ein experimentelles MDP zum Vergleich jeder Implementierung. Betrachten Sie zur Vereinfachung der Implementierung ein deterministisches MDP, eine Welt, in der die Belohnung $ r $ und der nächste Zustand $ s '$ für eine Aktion $ a $ deterministisch bestimmt werden. Zu diesem Zeitpunkt wird die Belman-Gleichung wie folgt vereinfacht.
Q^*(s, a) = r + \gamma V^*(s')
Das deterministische MDP kann durch ein Diagramm dargestellt werden, in dem der Zustand ein Knoten, die Aktion eine Kante und die Belohnung ein Kantengewicht (Attribut) ist. Die folgende Funktion erstellt das MDP, das im Experiment verwendet werden soll.
import networkx as nx
import random
def create_mdp(num_states, num_actions, reward_ratio=0.01, neighbor_count=30):
get_reward = lambda: 1.0 if random.random() < reward_ratio else 0.0
get_neighbor = lambda u: random.randint(u - neighbor_count, u + neighbor_count) % (num_states - 1)
edges = [
(i, (i + 1) % (num_states - 1), get_reward())
for i in range(num_states)
]
for _ in range(num_states * (num_actions - 1)):
u = random.randint(0, num_states - 1)
v = get_neighbor(u)
r = get_reward()
edges.append((u, v, r))
G = nx.DiGraph()
G.add_weighted_edges_from(edges)
return G
Die Anzahl der Zustände und die Anzahl der Aktionen (die durchschnittliche Anzahl der Aktionen in jedem Zustand) werden angegeben, und ein zufälliger, stark verbundener Graph und eine spärliche Belohnung werden erzeugt. Es wird durch DiGraph dargestellt, wobei der Knoten von networkx der Status ist, die Kante die Aktion ist und das Gewichtsattribut der Kante die Belohnung ist.
In zukünftigen Experimenten werden wir MDP mit 10000 Zuständen und durchschnittlich 3 Aktionen für jeden Zustand verwenden.
num_states = 10000
num_actions = 3
G = create_mdp(num_states, num_actions)
Der einfachste Algorithmus ist eine Methode zum Wiederholen von "Aktualisieren des Statuswerts aller Zustände" und "Aktualisieren des Aktionswerts aller Aktionen" und wird manchmal als synchrone dynamische Programmierung bezeichnet.
class NonConvergenceError(Exception):
pass
class SyncDP:
def __init__(self, G, gamma, max_sweeps, threshold):
self.G = G
self.gamma = gamma
self.max_sweeps = max_sweeps
self.threshold = threshold
self.V = {state : 0 for state in G.nodes}
self.TD = {state : 0 for state in G.nodes}
self.Q = {(state, action) : 0 for state, action in G.edges}
def get_reward(self, s, a):
return self.G.edges[s, a]['weight']
def sweep(self):
for state in self.G.nodes:
for action in self.G.successors(state):
self.Q[state, action] = self.get_reward(state, action) + self.gamma * self.V[action]
for state in self.G.nodes:
v_new = max([self.Q[state, action] for action in self.G.successors(state)])
self.TD[state] = abs(self.V[state] - v_new)
self.V[state] = v_new
def run(self):
for _ in range(self.max_sweeps):
self.sweep()
if (np.array(list(self.TD.values())) < self.threshold).all():
return self.V
raise NonConvergenceError
Die Parameter Abzinsungsrate Gamma, Konvergenzschwellenschwelle und maximale Anzahl von Sweeps werden ursprünglich durch die Anforderungen der Anwendung bestimmt, stellen hier jedoch geeignete Werte ein.
gamma = 0.95
threshold = 0.01
max_sweeps = 1000
Bei Ausführung unter dieser Bedingung ist die Verarbeitungszeit wie folgt.
%timeit V = SyncDP(G, gamma, max_sweeps, threshold).run()
8.83 s ± 273 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Wenn die Anzahl der Zustände 10000 beträgt und dies so lange dauert, sind die Anwendungen, die verwendet werden können, wahrscheinlich recht begrenzt. Diese Verarbeitungszeit wird die Basis für zukünftige Verbesserungen sein.
Asynchronous Dynamic Programming (Async DP) Da der Algorithmus zur Aktualisierung synchroner Werte alle Zustände in einem Durchlauf aktualisiert, kann ein einzelner Durchlauf eine beträchtliche Zeit in Anspruch nehmen, wenn die Anzahl der Zustände sehr groß ist. Asynchroner DP aktualisiert iterativ den Statuswert der Impression. Mit anderen Worten, anstatt ein neues Array vorzubereiten, um jedes Mal aktualisierte Werte wie synchrones DP zu speichern und alle Statuswerte zu aktualisieren und sie im neuen Array zu speichern, berechnen Sie bis dahin mit jeder Aktualisierung. Wiederholen Sie die Wertaktualisierungen und nutzen Sie dabei die verschiedenen verfügbaren Statuswerte. Es ist notwendig, alle Zustände ständig zu aktualisieren, um die Konvergenz sicherzustellen, aber die Aktualisierungsreihenfolge kann frei gewählt werden. Sie können beispielsweise Wertaktualisierungen beschleunigen, indem Sie Bedingungen überspringen, die für die beste Richtlinie weniger relevant sind.
Prioritized Sweeping Bei asynchronem DP kann die Reihenfolge der Wertaktualisierungen beliebig sein. Beim Aktualisieren des Werts sind nicht alle Zustände gleichermaßen nützlich, um den Wert anderer Zustände zu aktualisieren, und es wird erwartet, dass einige Zustände einen signifikanten Einfluss auf den Wert anderer Zustände haben. In MDP, wo Sie beispielsweise die spärliche Belohnung erhalten, über die Sie nachdenken, ist es wichtig, den belohnten Zustand effizient auf andere Zustände zu übertragen. Daher kann der folgende Algorithmus unter Verwendung einer Prioritätswarteschlange in Betracht gezogen werden.
Dieser Algorithmus wird als priorisiertes Sweeping bezeichnet. Die Implementierung sieht folgendermaßen aus:
import heapq
class PrioritizedDP(SyncDP):
def run(self):
self.sweep()
pq = [
(-abs(td_error), state)
for state, td_error in self.TD.items()
if abs(td_error) > self.threshold
]
heapq.heapify(pq)
while pq:
_, action = heapq.heappop(pq)
if self.TD[action] < self.threshold:
continue
self.TD[action] = 0
for state in self.G.predecessors(action):
self.Q[state, action] = self.get_reward(state, action) + self.gamma * self.V[action]
v_new = max([self.Q[state, action] for action in self.G.successors(state)])
td_error = abs(v_new - self.V[state])
self.TD[state] += td_error
if td_error > self.threshold:
heapq.heappush(pq, (-td_error, state))
self.V[state] = v_new
return self.V
Zunächst wird die Sweep-Funktion verwendet, um den Wert aller Zustände zu aktualisieren, und der Heap wird basierend auf dem Zustand erstellt, in dem der TD-Fehler den Schwellenwert überschreitet. Wiederholen Sie danach die Aktualisierung, bis die Warteschlange erschöpft ist. Aktualisieren Sie zunächst den Aktionswert $ Q $, sodass der aus der Warteschlange entnommene Status zum nächsten Status (= Aktion) wird. Aktualisieren Sie dann $ V $ für den Statuswert, der vom aktualisierten Aktionswert abhängt. Wenn der Unterschied zwischen vor und nach dem Update (td_error) den Schwellenwert überschreitet, wird er in die Warteschlange verschoben. Die Verarbeitungszeit ist wie folgt und wir konnten ungefähr die doppelte Geschwindigkeit erreichen.
%timeit V = PrioritizedDP(G, gamma, max_sweeps, threshold).run()
4.06 s ± 115 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Bisher haben wir objektbasierte Networkx-Algorithmen implementiert, aber häufig genannte Methoden wie Nachfolger nehmen die meiste Zeit in Anspruch. Lassen Sie uns überlegen, wie Sie die Vektorberechnung nach Numpy verwenden und gleichzeitig die Datenstruktur effizienter gestalten können. Durch Ausdrücken des Diagramms als Numpy-Array wird es möglich, die Vektorberechnung für die Aktionswertberechnung wie folgt zu verwenden.
class ArraySyncDP:
def __init__(self, A : ArrayGraph, gamma, max_sweeps, threshold):
self.A = A
self.gamma = gamma
self.max_sweeps = max_sweeps
self.threshold = threshold
self.V = np.zeros(A.num_states)
self.TD = np.full(A.num_states, np.inf)
self.Q = np.zeros(A.num_actions)
def run(self):
for _ in range(self.max_sweeps):
self.Q[:] = self.A.reward + self.gamma * self.V[self.A.action2next_state]
for state_id in range(self.A.num_states):
start, end = self.A.state2action_start[state_id], self.A.state2action_start[state_id + 1]
v_new = self.Q[start : end].max()
self.TD[state_id] = abs(self.V[state_id] - v_new)
self.V[state_id] = v_new
if (self.TD < self.threshold).all():
return self.V
raise NonConvergenceError
%timeit V = ArraySyncDP(A, gamma, max_sweeps, threshold).run()
3.5 s ± 99.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Es ist etwas schneller als Priorized Sweeping, aber nicht sehr effektiv. Es scheint, dass der Hauptgrund darin besteht, dass wir nicht alles in Vektoren konvertieren konnten und immer noch Minuten verwenden, um den Statuswert zu aktualisieren.
Cython Von Natur aus halten Arrays ihre grundlegenden Datentypen in zusammenhängenden Speicherbereichen, sodass sie schneller zugänglich sein sollten als Listen, die auf im Speicher verstreute Objekte verweisen. Python scheint jedoch langsamer auf Elemente zuzugreifen als Listen und Wörterbücher, da es in Python-Objekte konvertiert wird, um auf einzelne Elemente des Arrays zu verweisen, was zu Overhead führt.
Denken wir also über die Verwendung von Cython nach, um den Array-Zugriff zu beschleunigen. Cython ist ein Compiler, der typbeschriftetes Python in eine kompilierte Erweiterung konvertiert. Das konvertierte Erweiterungsmodul kann wie ein normales Python-Modul durch Import geladen werden. Wenn Sie Cython verwenden, können Sie den Prozess des Elementzugriffs mithilfe des numpy-Arrays als Schnittstelle aus den folgenden Gründen beschleunigen.
Implementieren Sie asynchrones DP in Cython. Ich möchte eine Prioritätswarteschlange verwenden, überspringe jedoch aufgrund der Verarbeitung von Python-Objekten nur Statuswertaktualisierungen mit einem kleinen TD-Fehler.
%%cython
import numpy as np
cimport numpy as np
cimport cython
ctypedef np.float64_t FLOAT_t
ctypedef np.int64_t INT_t
@cython.boundscheck(False)
@cython.wraparound(False)
def cythonic_backup(
FLOAT_t[:] V, FLOAT_t[:] Q, FLOAT_t[:] TD, FLOAT_t[:] reward,
INT_t[:] state2action_start, INT_t[:] action2next_state,
INT_t[:] next_state2inv_action, INT_t[:, :] inv_action2state_action,
FLOAT_t gamma, FLOAT_t threshold
):
cdef INT_t num_updates, state, action, next_state, inv_action, start_inv_action, end_inv_action, start_action, end_action
cdef FLOAT_t v
num_updates = 0
for next_state in range(len(V)):
if TD[next_state] < threshold:
continue
num_updates += 1
TD[next_state] = 0
start_inv_action = next_state2inv_action[next_state]
end_inv_action = next_state2inv_action[next_state + 1]
for inv_action in range(start_inv_action, end_inv_action):
state = inv_action2state_action[inv_action][0]
action = inv_action2state_action[inv_action][1]
Q[action] = reward[action] + gamma * V[next_state]
start_action = state2action_start[state]
end_action = state2action_start[state + 1]
v = -1e9
for action in range(start_action, end_action):
if v < Q[action]:
v = Q[action]
if v > V[state]:
TD[state] += v - V[state]
else:
TD[state] += V[state] - v
V[state] = v
return num_updates
class CythonicAsyncDP(ArraySyncDP):
def run(self):
A = self.A
for i in range(self.max_sweeps):
num_updates = cythonic_backup(
self.V, self.Q, self.TD, A.reward, A.state2action_start, A.action2next_state,
A.next_state2inv_action, A.inv_action2state_action, self.gamma, self.threshold
)
if num_updates == 0:
return self.V
raise NonConvergenceError
%timeit V = CythonicAsyncDP(A, gamma, max_sweeps, threshold).run()
18.6 ms ± 947 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Eine dramatische Beschleunigung von 18,6 ms wurde erreicht. Mit dieser Verarbeitungsgeschwindigkeit können je nach Verwendungszweck auch Probleme mit etwa 1 Million Staaten angemessen behandelt werden.
[1] Hochleistungs-Python, Micha Gorelick und Ian Ozsvald, O'Reilly Japan, 2015. [2] Reinforcement Learning, R. S. Sutton and A. G. Barto, The MIT Press, 2018.
Recommended Posts