Bei Methoden wie DQN wurde das Q (s, a) jedes Zustands durch eine Richtlinie berechnet, und die Aktion, die den Q-Wert maximierte, wurde ausgewählt und ausgeführt, dies konnte jedoch nur diskrete Aktionen verarbeiten. Andererseits suchte DDPG nicht nach der Aktion zur Maximierung des Q-Werts, um dem kontinuierlichen Aktionsraum zu entsprechen, sondern reagierte mit der Parametrisierung der Richtlinie und der direkten Ausgabe der Aktion. Daher ist es eine entscheidende Maßnahme.
Es ist ein vertrauter Wiederholungspuffer für tiefgreifendes Lernen. Der aktuelle Status, die Aktion zu diesem Zeitpunkt, der nächste Status, die sofortige Belohnung und der Endstatus werden als ein Taple gespeichert.
from collections import deque, namedtuple
import random
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer(object):
def __init__(self, capacity=1e6):
self.capacity = capacity
self.memory = deque([], maxlen=int(capacity))
def append(self, *args):
transition = Transition(*args)
self.memory.append(transition)
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def reset(self):
self.memory.clear()
def length(self):
return len(self.memory)
def __len__(self):
return len(self.memory)
In DDPG gibt es Actor $ \ mu (s) $, der die Aktion kontinuierlich aus dem aktuellen Status ausgibt, und Critic $ Q (s, a) $, der den Q-Wert aus dem aktuellen Status und der aktuellen Aktion ausgibt. Die Initialisierung der Gewichte jeder Schicht entspricht dem Originalpapier. Weitere Informationen finden Sie dort (Link unten). Charakteristisch ist, dass sich in der letzten Ebene des Schauspielers ein Tanh befindet und beim Empfang einer Aktion in Critic in der zweiten Ebene. Wenn Sie mit Pendel experimentieren, können Sie möglicherweise 2 in die Ausgabe schreiben, da der Aktionsbereich [-2, 2] beträgt.
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
def init_weight(size):
f = size[0]
v = 1. / np.sqrt(f)
return torch.tensor(np.random.uniform(low=-v, high=v, size=size), dtype=torch.float)
class ActorNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-3):
super(ActorNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size, hidden2_size)
self.fc3 = nn.Linear(hidden2_size, num_action[0])
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(h))
y = torch.tanh(self.fc3(h)) #Darf ich mit 2 multiplizieren?
return y
class CriticNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-4):
super(CriticNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size+num_action[0], hidden2_size)
self.fc3 = nn.Linear(hidden2_size, 1)
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x, action):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(torch.cat([h, action], dim=1)))
y = self.fc3(h)
return y
Wenn Sie im Agenten eine Aktion auswählen, wird die Aktion so wie sie ist entscheidend. Fügen Sie also Rauschen $ \ mathcal {N} $ hinzu. Das Rauschen zu diesem Zeitpunkt ist [Ornstein-Woollenbeck-Prozess](https://ja.wikipedia.org/wiki/%E3%82%AA%E3%83%AB%E3%83%B3%E3%82%B7% E3% 83% A5% E3% 82% BF% E3% 82% A4% E3% 83% B3% EF% BC% 9D% E3% 82% A6% E3% 83% BC% E3% 83% AC% E3% 83% B3% E3% 83% 99% E3% 83% 83% E3% 82% AF% E9% 81% 8E% E7% A8% 8B). Ich kenne die Details nicht. Stellen Sie sich das als Lärm vor, der sich im Laufe der Zeit dem Durchschnitt nähert. Ich weiß es nicht.
a = \mu(s) + \mathcal{N}
Für das Training jedes Modells aktualisiert Critic das Modell, indem er den Gradienten ermittelt, um den TD-Fehler zu minimieren, ähnlich wie bei DQN. Die Verlustfunktion ist wie folgt. N ist die Chargengröße.
L = \frac{1}{N} \sum_{i=1}^N (r_i + \gamma Q^{\prime}(s_{i+1}, \mu^{\prime}(s_{i+1})) - Q(s_i, a_i))^2
Der Akteur aktualisiert das Modell, um den Q-Wert zu maximieren. Beachten Sie, dass der Verlust negativ ist, da er zu diesem Zeitpunkt maximiert wird. Die Zielfunktion ist wie folgt.
J = \frac{1}{N}\sum_{i=1}^N Q(s_{i}, \mu{s_i})
In der obigen Zielfunktion ist die mit einem Strich das Zielnetzwerk. Dies wird oft verwendet, um das Lernen zu stabilisieren. In DQN usw. wird dieses Zielnetzwerk alle paar Epochen aktualisiert, während in DDPG der Hyperparameter $ \ tau (\ ll 1) $ verwendet wird.
\theta \gets \tau \theta + (1 - \tau) \theta^{\prime}
Es wird langsam aktualisiert, wie. Dies stabilisiert das Lernen, aber es scheint, dass die Lernzeit etwas länger sein wird.
import torch
import torch.nn.functional as F
import numpy as np
import copy
class OrnsteinUhlenbeckProcess:
def __init__(self, theta=0.15, mu=0.0, sigma=0.2, dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.size = size
self.num_steps = 0
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0
self.c = sigma
self.sigma_min = sigma
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.num_steps) + self.c)
return sigma
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma() * np.sqrt(self.dt) * np.random.normal(size=self.size)
self.x_prev = x
self.num_steps += 1
return x
class DDPG:
def __init__(self, actor, critic, optimizer_actor, optimizer_critic, replay_buffer, device, gamma=0.99, tau=1e-3, epsilon=1.0, batch_size=64):
self.actor = actor
self.critic = critic
self.actor_target = copy.deepcopy(self.actor)
self.critic_target = copy.deepcopy(self.critic)
self.optimizer_actor = optimizer_actor
self.optimizer_critic = optimizer_critic
self.replay_buffer = replay_buffer
self.device = device
self.gamma = gamma
self.tau = tau
self.epsilon = epsilon
self.batch_size = batch_size
self.random_process = OrnsteinUhlenbeckProcess(size=actor.num_action[0])
self.num_state = actor.num_state
self.num_action = actor.num_action
def add_memory(self, *args):
self.replay_buffer.append(*args)
def reset_memory(self):
self.replay_buffer.reset()
def get_action(self, state, greedy=False):
state_tensor = torch.tensor(state, dtype=torch.float, device=self.device).view(-1, *self.num_state)
action = self.actor(state_tensor)
if not greedy:
action += self.epsilon*torch.tensor(self.random_process.sample(), dtype=torch.float, device=self.device)
return action.squeeze(0).detach().cpu().numpy()
def train(self):
if len(self.replay_buffer) < self.batch_size:
return None
transitions = self.replay_buffer.sample(self.batch_size)
batch = Transition(*zip(*transitions))
state_batch = torch.tensor(batch.state, device=self.device, dtype=torch.float)
action_batch = torch.tensor(batch.action, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(batch.next_state, device=self.device, dtype=torch.float)
reward_batch = torch.tensor(batch.reward, device=self.device, dtype=torch.float).unsqueeze(1)
not_done = np.array([(not done) for done in batch.done])
not_done_batch = torch.tensor(not_done, device=self.device, dtype=torch.float).unsqueeze(1)
# need to change
qvalue = self.critic(state_batch, action_batch)
next_qvalue = self.critic_target(next_state_batch, self.actor_target(next_state_batch))
target_qvalue = reward_batch + (self.gamma * next_qvalue * not_done_batch)
critic_loss = F.mse_loss(qvalue, target_qvalue)
self.optimizer_critic.zero_grad()
critic_loss.backward()
self.optimizer_critic.step()
actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
self.optimizer_actor.zero_grad()
actor_loss.backward()
self.optimizer_actor.step()
# soft parameter update
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
Daran ist nichts Neues. Wie andere Verstärkungslernalgorithmen empfängt es den Zustand von der Umgebung und gibt Ihnen das Gefühl, zu handeln und zu lernen. Jeder Hyperparameter stimmt mit dem Originalpapier überein. (Vielleicht)
import torch
import torch.optim as optim
import gym
max_episodes = 300
memory_capacity = 1e6 #Pufferkapazität
gamma = 0.99 #Diskontsatz
tau = 1e-3 #Zielaktualisierungsrate
epsilon = 1.0 #Wenn Sie sich mit der Menge an Lärm anlegen möchten, brauchen Sie sie wahrscheinlich nicht
batch_size = 64
lr_actor = 1e-4
lr_critic = 1e-3
logger_interval = 10
weight_decay = 1e-2
env = gym.make('Pendulum-v0')
num_state = env.observation_space.shape
num_action = env.action_space.shape
max_steps = env.spec.max_episode_steps
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
actorNet = ActorNetwork(num_state, num_action).to(device)
criticNet = CriticNetwork(num_state, num_action).to(device)
optimizer_actor = optim.Adam(actorNet.parameters(), lr=lr_actor)
optimizer_critic = optim.Adam(criticNet.parameters(), lr=lr_critic, weight_decay=weight_decay)
replay_buffer = ReplayBuffer(capacity=memory_capacity)
agent = DDPG(actorNet, criticNet, optimizer_actor, optimizer_critic, replay_buffer, device, gamma, tau, epsilon, batch_size)
for episode in range(max_episodes):
observation = env.reset()
total_reward = 0
for step in range(max_steps):
action = agent.get_action(observation)
next_observation, reward, done, _ = env.step(action)
total_reward += reward
agent.add_memory(observation, action, next_observation, reward, done)
agent.train()
observation = next_observation
if done:
break
if episode % logger_interval == 0:
print("episode:{} total reward:{}".format(episode, total_reward))
for episode in range(3):
observation = env.reset()
env.render()
for step in range(max_steps):
action = agent.get_action(observation, greedy=True)
next_observation, reward, done, _ = env.step(action)
observation = next_observation
env.render()
if done:
break
env.close()
Wenn Sie die Umgebung von gym.make ändern, sollten Sie in der Lage sein, in anderen Umgebungen zu lernen.
Es ist eine grafische Darstellung der kumulativen Belohnungen und Lernepisoden. Ich denke, Sie lernen auf gute Weise. Es wurde richtig gebaut. Toll. (Gif ist nicht veröffentlicht, weil ich nicht weiß, wie ich es machen soll)
Wenn Sie den obigen Code so ausführen, wie er sich in einer Datei befindet, können Sie den Vorgang grundsätzlich überprüfen. Da die Implementierung voll war, bleiben einige zusätzliche Variablen usw. übrig. Mit Pendulum-v0 können Sie nur mit CPU lernen, auch wenn Sie sich nicht in einer Umgebung befinden, in der GPU verfügbar ist. Das Lernen kann jedoch etwas instabil werden. Versuchen Sie es daher erneut. Wenn es eine Gelegenheit gibt, werden wir andere Methoden implementieren.
Continuous Control with Deep Reinforcement Learning
Recommended Posts