Ich habe versucht, PPO in Python zu implementieren

Über PPO

PPO ist einer der tiefgreifenden Lernalgorithmen, unter denen es auf Richtlinien basiert. Eine Richtlinienbasis ist ein Algorithmus, der eine Richtlinienfunktion direkt optimiert, die eine Aktionswahrscheinlichkeit in einer bestimmten Umgebung ausgibt. Andere richtlinienbasierte Algorithmen umfassen A3C und TRPO. Nicht auf Richtlinien basierende Algorithmen umfassen wertbasierte Algorithmen wie DQN.

Vergleich mit anderen Methoden

Zunächst werde ich andere Methoden erläutern.

Über A3C

In A3C werden drei typische Techniken verwendet: Actor-Critic, Advantage und Asynchronous.

Actor-Critic Actor-Critic ist ein Merkmal der Netzwerkstruktur. In A3C als Zielfunktion der Politik

L_{policy}=A(t)log\pi_{\theta}(a_{t}|s_{t})

Wird genutzt. Von diesen wird A (t) als Vorteilsfunktion bezeichnet.

A(t)=(R(t)-V(s_{t}))

Es wird vertreten durch. Da die Wertfunktion V (s) verwendet wird, um dieses A (t) zu erhalten, handelt es sich um eine Methode zum Erstellen eines Modells, so dass der Statuswert gleichzeitig mit der Richtlinie zur Ausgabe des Netzwerks wird (Aktionswahrscheinlichkeitsverteilung). So können Sie schneller lernen.

Advantage Verwenden Sie den folgenden Fehler, um die Normalzustandswertfunktion $ V (t) $ zu aktualisieren

loss=r(s_{t})+\gamma V(s_{t+1})-V(s_{t})

Wir werden lernen, $ V (s) $ zu befriedigen. Wenn bei dieser Methode jedoch die Anzahl der Schritte in einer Episode groß ist, nimmt die Anzahl der Lernvorgänge zu, die erforderlich sind, um sich auf die Schritte auszudehnen, bei denen der Einfluss des Lernens früh ist, und das Lernen wird langsam. Daher verwendet Advantage den folgenden Fehler.

loss=\sum_{k=1}^n \gamma^{k-1} r(s_{t+k})+\gamma^n V(s_{t+2})-V(s_{t})

Durch Anpassen von $ n $ in dieser Gleichung breiten sich die Auswirkungen des weiteren Lernens schneller aus. Wenn es jedoch zu groß ist, verlangsamt sich die Lerngeschwindigkeit. Beispielsweise ist CartPole nicht sehr effektiv und in einigen Fällen kann es schneller sein, Advantage nicht zu verwenden. Asynchronous Asynchron ist eine Methode, die sich auf Lernmethoden bezieht. Normalerweise ist bei der Suche mit einem Agenten die Lernrichtung tendenziell voreingenommen. Als Gegenmaßnahme wird bei der Suche mit einem gemeinsamen neuronalen Netzwerk und mehreren Agenten jeweils eine feste Anzahl von Schritten festgelegt. Im Laufe der Zeit oder am Ende einer Episode aktualisiert jeder Agent die gemeinsam genutzten Netzwerkparameter für einen Gradienten über die Zielfunktionsparameter.

Jeder Agent verfügt außerdem über ein neuronales Netzwerk, das Parameter aus dem gemeinsam genutzten Netzwerk kopiert und vor Beginn der Episode sucht. Dies verhindert eine Verzerrung beim Lernen, ähnlich wie beim Wiedergabepuffer in DQN.

A3C-Zielfunktion

Die Zielfunktion von A3C verwendet drei Methoden: Richtlinie, Zustandswert und Entropie für die Regularisierung.

\begin{align}
L_{policy} &= A(t)log\pi_{\theta}(a_{t}|s_{t})\\
L_{value} &= (R(t)-V(s_{t}))^2\\
L_{entropy} &= \pi_{\theta}(a_{t}|s_{t})log\pi_{\theta}(a_{t}|s_{t})
\end{align}

Es wird ausgedrückt als, und schließlich wird es die folgende Formel in Form der Kombination dieser.

L_{all} = -L_{policy}+C_{value}L_{value}-C_{entropy}L_{entropy}

$ C_ {Wert} $ und $ C_ {Entropie} $ repräsentieren Konstanten. Das Lernen erfolgt durch Minimierung.

Über PPO

In A3C beträgt der Richtliniengradient

\Delta L_{policy} = \Delta log\pi_{\theta}(a_{t}|s_{t})A(t)

Es wird ausgedrückt als und da $ log $ im Ausdruck enthalten ist, wird es beim Aktualisieren sehr groß. Daher ist es in PPO durch Einschränken der Aktualisierung möglich, eine Überaktualisierung zu verhindern. Auch die Zielfunktion unterscheidet sich stark von A3C.

r_{t}(\theta)=\frac{\pi_{\theta_{new}}(a_{t}|s_{t})}{\pi_{\theta_{old}}(a_{t}|s_{t})}\\
L^{CPI}=\mathbb E \big[\,r_{t}(\theta)A(t)\, \big]

Verwenden Sie diese Funktion als Ersatzzielfunktion und verwenden Sie die Clip-Funktion, um sie beim Aktualisieren zur Zielfunktion der Richtlinie zu machen. Clip-Funktion

clip(x,a,b)=\left\{
\begin{array}{ll}
b & (x > b) \\
x & (a \leq x \leq b) \\
a & (x < a)
\end{array}
\right.

Auf diese Weise ausgedrückt, egal wie sich $ x $ ändert, passt es zwischen $ a $ und $ b $. Die Zielfunktion mit dieser Funktion

L_{policy}=min \big(\, r_{t}(\theta)A(t),clip(r_{t}(\theta),1-\epsilon,1+\epsilon)\, \big)

Wird ausgedrückt als. In Bezug auf die Statuswertfunktion ist sie fast identisch mit PPO.

Darüber hinaus lernt PPO die Verwendung von Advantage auf die gleiche Weise wie A3C.

Implementierung

Ich habe bei der Implementierung auf die folgende Site verwiesen [Lernen stärken] PPO zum Lernen während der Implementierung von [Stehen mit Wagenstange: Komplett mit 1 Datei]

Übersicht über die Verarbeitung

main (): Erstellen Sie einen Thread und führen Sie die Verarbeitung durch

Worker(thread_type, thread_name, ppo_brain) -run_thread (): Separate Verarbeitung nach Thread-Typ -env_run (): Lassen Sie den Agenten die Umgebung erkunden.

ppo_agent(ppo_brain) -action (state): Empfängt den Status und gibt die Aktion aus -greedy_action (state): Ausgabeaktion mit der Methode $ \ epsilon-greedy $ -update (Speicher): Lernen Sie die Daten kennen, die während der Suche unter Berücksichtigung des Vorteils gespeichert wurden ppo_brain() -build_graph: Definieren Sie hier die Form des Diagramms -update: Update

main

def main(args):
    #Prozess zum Erstellen eines Threads
    with tf.device("/cpu:0"):
        brain = ppo_brain()
        thread=[]
        for i in range(WORKER_NUM):
            thread_name = "local_thread"+str(i)
            thread.append(Worker(thread_type = "train",thread_name = thread_name,brain = brain))
    
    COORD = tf.train.Coordinator()
    SESS.run(tf.global_variables_initializer())
    saver = tf.train.Saver()
    
  #Laden Sie den vorherigen Trainingsprozess. Führen Sie dies grundsätzlich nach der Definition des Modells durch
    if args.load:
        ckpt = tf.train.get_checkpoint_state(MODEL_DIR)
        if ckpt:
            saver.restore(SESS,MODEL_SAVE_PATH)

    runnning_thread=[]
    for worker in thread:
        job = lambda: worker.run_thread()
        t = threading.Thread(target=job)
        t.start()
        runnning_thread.append(t)
    COORD.join(runnning_thread)
   
    #Machen Sie einen Test, wenn das Lernen vorbei ist
    test = Worker(thread_type = "test",thread_name = "test_thread",brain=brain)
    test.run_thread()

    if args.save:
        saver.save(SESS,MODEL_SAVE_PATH)
        print("saved")

Worker

class Worker:
    def __init__(self,thread_type,thread_name,brain):
        self.thread_type = thread_type
        self.name = thread_name
        self.agent = ppo_agent(brain)
        self.env = gym.make(ENV_NAME)
        #Speichern Sie das Video zum Zeitpunkt des Tests
        if self.thread_type == "test" and args.video:
            self.env = wrappers.Monitor(self.env, VIDEO_DIR, force = True)
        self.leaning_memory = np.zeros(10)
        self.memory = []
        self.total_trial = 0

    def run_thread(self):
        while True:
            if self.thread_type == "train" and not isLearned:
                self.env_run()
            elif self.thread_type == "train" and isLearned:
                sleep(3)
                break
            elif self.thread_type == "test" and not isLearned:
                sleep(3)
            elif self.thread_type == "test" and isLearned:
                self.env_run()
                break

    def env_run(self):
        global isLearned
        global frame
        self.total_trial += 1

        step = 0
        observation = self.env.reset()

        while True:
            step += 1
            frame += 1
        
       #Aktionsauswahl
            if self.thread_type == "train":
                action=self.agent.greedy_action(observation)
            elif self.thread_type == "test":
                self.env.render()
                sleep(0.01)
                action=self.agent.action(observation)
            
            next_observation,_,done,_ = self.env.step(action)

            reward = 0

            if done:
                if step >= 199:
                    reward = 1 #Zum Zeitpunkt des Erfolgs
                else:
                    reward =- 1 #Zum Zeitpunkt des Scheiterns
            else:
                #Wenn es nicht vorbei ist
                reward+=0
            
       #Speichern Sie das Ergebnis im Speicher
            self.memory.append([observation,action,reward,done,next_observation])

            observation = next_observation

            if done:
                break

     #Berechnen Sie die durchschnittliche Punktzahl von 10 Mal
        self.leaning_memory = np.hstack((self.leaning_memory[1:],step))
        print("Thread:",self.name," Thread_trials:",self.total_trial," score:",step," mean_score:",self.leaning_memory.mean()," total_step:",frame)

     #Am Ende des Lernens
        if self.leaning_memory.mean() >= 199:
            isLearned = True
            sleep(3)
        else:
            #Parameteraktualisierung
            self.agent.update(self.memory)
            self.memory = []
        

ppo_agent

class ppo_agent:
    def __init__(self,brain):
        self.brain=brain
        self.memory=[]

    #Handle ohne zufällige Elemente
    def action(self,state):
        prob,v = self.brain.predict(state)
        return np.random.choice(ACTION_LIST,p = prob)

    #Handle zufällig mit einer bestimmten Wahrscheinlichkeit
    def greedy_action(self,state):
        if frame >= EPS_STEPS:   
            eps = EPS_END
        else:
            eps = EPS_START + frame* (EPS_END - EPS_START) / EPS_STEPS  

        if np.random.random() <= eps:
            return np.random.choice(ACTION_LIST)
        else:
            return self.action(state)

    #Verarbeiten Sie das Suchergebnis und ppo_An Gehirnklasse senden
    def update(self,memory):
        R = sum([memory[j][2] * (GAMMA ** j) for j in range(ADVANTAGE + 1)])
        self.memory.append([memory[0][0], memory[0][1], R,memory[0][3], memory[0][4], GAMMA ** ADVANTAGE])

        #Betrachten Sie den Vorteil
        for i in range(1, len(memory) - ADVANTAGE):
            R = ((R - memory[i-1][2]) / GAMMA) + memory[i + ADVANTAGE][2] * (GAMMA ** (ADVANTAGE - 1))
            self.memory.append([memory[i][0], memory[i][1], R,memory[i + ADVANTAGE][3], memory[i][4],GAMMA ** ADVANTAGE])
            
        for i in range(ADVANTAGE - 1):
            R = ((R - memory[len(memory) - ADVANTAGE + i][2]) / GAMMA)
            self.memory.append([memory[i][0], memory[i][1], R, True, memory[i][4], GAMMA ** (ADVANTAGE - i)])

        #ppo_Senden Sie Daten zur Aktualisierung an die Gehirnklasse
        self.brain.update(self.memory)

        self.memory = []

ppo_brain

class ppo_brain:
    def __init__(self):
        self.build_model()
        self.name="brain"
        self.prob_old=1.0

    def build_model(self):
        self.input=tf.placeholder(dtype=tf.float32,shape=[None,STATE_NUM])
       
        #Hier werden das Modell des alten Parameters und das Modell des neuen Parameters definiert und die Aktionswahrscheinlichkeit und der Zustandswert für dieselbe Eingabe ausgegeben.
        #Neues Netzwerk
        with tf.variable_scope("current_brain"):
            hidden1=tf.layers.dense(self.input,HIDDEN_LAYERE,activation=tf.nn.leaky_relu)
            self.prob=tf.layers.dense(hidden1,ACTION_NUM,activation=tf.nn.softmax)
            self.v=tf.layers.dense(hidden1,1)
        #Altes Netzwerk
        with tf.variable_scope("old_brain"):
            old_hiddend1=tf.layers.dense(self.input,HIDDEN_LAYERE,activation=tf.nn.leaky_relu)
            self.old_prob=tf.layers.dense(hidden1,ACTION_NUM,activation=tf.nn.softmax)
            self.old_v=tf.layers.dense(hidden1,1)

        self.reward=tf.placeholder(dtype=tf.float32,shape=(None,1))
        self.action=tf.placeholder(dtype=tf.float32,shape=(None,ACTION_NUM))

###########Nachfolgend finden Sie die Definition der Verlustfunktion############
        #Definition der Vorteilsfunktion
        advantage = self.reward-self.v

        #Definitionsteil der Verlustfunktion der Police
        r_theta = tf.div(self.prob + 1e-10, tf.stop_gradient(self.old_prob) + 1e-10)
        action_theta = tf.reduce_sum(tf.multiply(r_theta, self.action), axis = 1, keepdims = True)
        #Berechne den Clip von r
        r_clip = tf.clip_by_value(action_theta, 1 - EPSIRON, 1 + EPSIRON)
        #Wenn Sie die Vorteilsfunktion als Zielfunktion der Richtlinie verwenden, wird der Gradient nicht berücksichtigt_Verwenden Sie den Farbverlauf
        advantage_cpi = tf.multiply(action_theta, tf.stop_gradient(advantage))
        advantage_clip = tf.multiply(r_clip , tf.stop_gradient(advantage))
        self.policy_loss = tf.minimum(advantage_clip , advantage_cpi)

        #Zustandswertverlustfunktion
        self.value_loss = tf.square(advantage)

        #Definition der Entropie
        self.entropy = tf.reduce_sum(self.prob*tf.log(self.prob+1e-10),axis = 1,keepdims = True)

        #Definition der Endverlustfunktion
        self.loss = tf.reduce_sum(-self.policy_loss + LOSS_V * self.value_loss - LOSS_ENTROPY * self.entropy)

##############Im Folgenden werden die für die Aktualisierung erforderlichen Aktionen definiert##############

        #Parameteraktualisierung (mit Adam minimiert)
        self.opt = tf.train.AdamOptimizer(learning_rate = LEARNING_RATE)
        self.minimize = self.opt.minimize(self.loss)

        #Holen Sie sich neue und alte Parameter aus ihren jeweiligen Netzwerken
        self.weight_param = tf.get_collection(key = tf.GraphKeys.TRAINABLE_VARIABLES, scope = "current_brain")
        self.old_weight_param = tf.get_collection(key = tf.GraphKeys.TRAINABLE_VARIABLES, scope = "old_brain")

        #Ersetzen Sie alte Netzwerkparameter durch neue Netzwerkparameter
        self.insert = [g_p.assign(l_p) for l_p,g_p in zip(self.weight_param,self.old_weight_param)]

    #Aktionswahrscheinlichkeit und Zustandswert aus Zustand ausgeben
    def predict(self,state):
        state=np.array(state).reshape(-1,STATE_NUM)
        feed_dict={self.input:state}
        p,v=SESS.run([self.prob,self.v],feed_dict)
        return p.reshape(-1),v.reshape(-1)
    
    #Erstellen Sie einen Stapel durch Vorverarbeitung, bevor Sie Daten eingeben
    #Aktualisieren
    def update(self,memory):
        length=len(memory)
       
        s_=np.array([memory[j][0] for j in range(length)]).reshape(-1,STATE_NUM)
        a_=np.eye(ACTION_NUM)[[memory[j][1] for j in range(length)]].reshape(-1,ACTION_NUM)
        R_=np.array([memory[j][2] for j in range(length)]).reshape(-1,1)
        d_=np.array([memory[j][3] for j in range(length)]).reshape(-1,1)
        s_mask=np.array([memory[j][5] for j in range(length)]).reshape(-1,1)
        _s=np.array([memory[j][4] for j in range(length)]).reshape(-1,STATE_NUM)

        #Schliessen Sie den späteren Zustandswert ab
        _, v=self.predict(_s)
        #Berechnen Sie die Belohnung unter Berücksichtigung des Vorteils
        R=(np.where(d_,0,1)*v.reshape(-1,1))*s_mask+R_
        #Parameteraktualisierung
        feed_dict={self.input:s_, self.action:a_, self.reward:R}

        SESS.run(self.minimize,feed_dict)
        #Netzwerk-Update
        SESS.run(self.insert)

Ganzer Code

Der gesamte Code sieht folgendermaßen aus:

import argparse
import tensorflow as tf
import numpy as np
import random
import threading
import gym
from time import sleep
from gym import wrappers
from os import path

parser=argparse.ArgumentParser(description="Reiforcement training with PPO",add_help=True)
parser.add_argument("--model",type=str,required=True,help="model base name. required")
parser.add_argument("--env_name",default="CartPole-v0",help="environment name. default is CartPole-v0")
parser.add_argument("--save",action="store_true",default=False,help="save command")
parser.add_argument("--load",action="store_true",default=False,help="load command")
parser.add_argument("--thread_num",type=int,default=5)
parser.add_argument("--video",action="store_true",default=False, help="write this if you want to save as video")
args=parser.parse_args()


ENV_NAME=args.env_name
WORKER_NUM=args.thread_num
#define constants
VIDEO_DIR="./train_info/video"
MODEL_DIR="./train_info/models"
MODEL_SAVE_PATH=path.join(MODEL_DIR,args.model)

ADVANTAGE=2
STATE_NUM=4
ACTION_LIST=[0,1]
ACTION_NUM=2
#epsiron parameter
EPS_START = 0.5
EPS_END = 0.1
EPS_STEPS = 200 * WORKER_NUM**2
#learning parameter
GAMMA=0.99
LEARNING_RATE=0.002
#loss constants
LOSS_V=0.5
LOSS_ENTROPY=0.02
HIDDEN_LAYERE=30

EPSIRON = 0.2

class ppo_brain:
    def __init__(self):
        self.build_model()
        self.name="brain"
        self.prob_old=1.0

    def build_model(self):
        self.input=tf.placeholder(dtype=tf.float32,shape=[None,STATE_NUM])
       
        #Hier werden das Modell des alten Parameters und das Modell des neuen Parameters definiert und die Aktionswahrscheinlichkeit und der Zustandswert für dieselbe Eingabe ausgegeben.
        with tf.variable_scope("current_brain"):
            hidden1=tf.layers.dense(self.input,HIDDEN_LAYERE,activation=tf.nn.leaky_relu)
            self.prob=tf.layers.dense(hidden1,ACTION_NUM,activation=tf.nn.softmax)
            self.v=tf.layers.dense(hidden1,1)
        with tf.variable_scope("old_brain"):
            old_hiddend1=tf.layers.dense(self.input,HIDDEN_LAYERE,activation=tf.nn.leaky_relu)
            self.old_prob=tf.layers.dense(hidden1,ACTION_NUM,activation=tf.nn.softmax)
            self.old_v=tf.layers.dense(hidden1,1)

        self.reward=tf.placeholder(dtype=tf.float32,shape=(None,1))
        self.action=tf.placeholder(dtype=tf.float32,shape=(None,ACTION_NUM))

###########Nachfolgend finden Sie die Definition der Verlustfunktion############
        #Definition der Vorteilsfunktion
        advantage = self.reward-self.v

        #Definitionsteil der Verlustfunktion der Police
        r_theta = tf.div(self.prob + 1e-10, tf.stop_gradient(self.old_prob) + 1e-10)
        action_theta = tf.reduce_sum(tf.multiply(r_theta, self.action), axis = 1, keepdims = True)
        r_clip = tf.clip_by_value(action_theta, 1 - EPSIRON, 1 + EPSIRON)
        advantage_cpi = tf.multiply(action_theta, tf.stop_gradient(advantage))
        advantage_clip = tf.multiply(r_clip , tf.stop_gradient(advantage))
        self.policy_loss = tf.minimum(advantage_clip , advantage_cpi)

        #Zustandswertverlustfunktion
        self.value_loss = tf.square(advantage)

        #Definition der Entropie
        self.entropy = tf.reduce_sum(self.prob*tf.log(self.prob+1e-10),axis = 1,keepdims = True)

        #Definition der Endverlustfunktion
        self.loss = tf.reduce_sum(-self.policy_loss + LOSS_V * self.value_loss - LOSS_ENTROPY * self.entropy)

##############Im Folgenden werden die für die Aktualisierung erforderlichen Aktionen definiert##############

        #Parameteraktualisierung (mit Adam minimiert)
        self.opt = tf.train.AdamOptimizer(learning_rate = LEARNING_RATE)
        self.minimize = self.opt.minimize(self.loss)

        #Holen Sie sich neue und alte Parameter aus ihren jeweiligen Netzwerken
        self.weight_param = tf.get_collection(key = tf.GraphKeys.TRAINABLE_VARIABLES, scope = "current_brain")
        self.old_weight_param = tf.get_collection(key = tf.GraphKeys.TRAINABLE_VARIABLES, scope = "old_brain")

        #Ersetzen Sie alte Netzwerkparameter durch neue Netzwerkparameter
        self.insert = [g_p.assign(l_p) for l_p,g_p in zip(self.weight_param,self.old_weight_param)]

    #Aktionswahrscheinlichkeit und Zustandswert aus Zustand ausgeben
    def predict(self,state):
        state=np.array(state).reshape(-1,STATE_NUM)
        feed_dict={self.input:state}
        p,v=SESS.run([self.prob,self.v],feed_dict)
        return p.reshape(-1),v.reshape(-1)
    
    #Erstellen Sie einen Stapel durch Vorverarbeitung, bevor Sie Daten eingeben
    #Aktualisieren
    def update(self,memory):
        length=len(memory)
       
        s_=np.array([memory[j][0] for j in range(length)]).reshape(-1,STATE_NUM)
        a_=np.eye(ACTION_NUM)[[memory[j][1] for j in range(length)]].reshape(-1,ACTION_NUM)
        R_=np.array([memory[j][2] for j in range(length)]).reshape(-1,1)
        d_=np.array([memory[j][3] for j in range(length)]).reshape(-1,1)
        s_mask=np.array([memory[j][5] for j in range(length)]).reshape(-1,1)
        _s=np.array([memory[j][4] for j in range(length)]).reshape(-1,STATE_NUM)

        #Schliessen Sie den späteren Zustandswert ab
        _, v=self.predict(_s)
        #Berechnen Sie die Belohnung unter Berücksichtigung des Vorteils
        R=(np.where(d_,0,1)*v.reshape(-1,1))*s_mask+R_
        #Parameteraktualisierung
        feed_dict={self.input:s_, self.action:a_, self.reward:R}

        SESS.run(self.minimize,feed_dict)
        #Netzwerk-Update
        SESS.run(self.insert)


class ppo_agent:
    def __init__(self,brain):
        self.brain=brain
        self.memory=[]

    #Handle ohne zufällige Elemente
    def action(self,state):
        prob,v = self.brain.predict(state)
        return np.random.choice(ACTION_LIST,p = prob)

    #Handle zufällig mit einer bestimmten Wahrscheinlichkeit
    def greedy_action(self,state):
        if frame >= EPS_STEPS:   
            eps = EPS_END
        else:
            eps = EPS_START + frame* (EPS_END - EPS_START) / EPS_STEPS  

        if np.random.random() <= eps:
            return np.random.choice(ACTION_LIST)
        else:
            return self.action(state)

    #Verarbeiten Sie das Suchergebnis und ppo_An Gehirnklasse senden
    def update(self,memory):
        R = sum([memory[j][2] * (GAMMA ** j) for j in range(ADVANTAGE + 1)])
        self.memory.append([memory[0][0], memory[0][1], R,memory[0][3], memory[0][4], GAMMA ** ADVANTAGE])

        #Betrachten Sie den Vorteil
        for i in range(1, len(memory) - ADVANTAGE):
            R = ((R - memory[i-1][2]) / GAMMA) + memory[i + ADVANTAGE][2] * (GAMMA ** (ADVANTAGE - 1))
            self.memory.append([memory[i][0], memory[i][1], R,memory[i + ADVANTAGE][3], memory[i][4],GAMMA ** ADVANTAGE])
            
        for i in range(ADVANTAGE - 1):
            R = ((R - memory[len(memory) - ADVANTAGE + i][2]) / GAMMA)
            self.memory.append([memory[i][0], memory[i][1], R, True, memory[i][4], GAMMA ** (ADVANTAGE - i)])

        #ppo_Senden Sie Daten zur Aktualisierung an die Gehirnklasse
        self.brain.update(self.memory)

        self.memory = []


class Worker:
    def __init__(self,thread_type,thread_name,brain):
        self.thread_type = thread_type
        self.name = thread_name
        self.agent = ppo_agent(brain)
        self.env = gym.make(ENV_NAME)
        #Speichern Sie das Video zum Zeitpunkt des Tests
        if self.thread_type == "test" and args.video:
            self.env = wrappers.Monitor(self.env, VIDEO_DIR, force = True)
        self.leaning_memory = np.zeros(10)
        self.memory = []
        self.total_trial = 0

    def run_thread(self):
        while True:
            if self.thread_type == "train" and not isLearned:
                self.env_run()
            elif self.thread_type == "train" and isLearned:
                sleep(3)
                break
            elif self.thread_type == "test" and not isLearned:
                sleep(3)
            elif self.thread_type == "test" and isLearned:
                self.env_run()
                break

    def env_run(self):
        global isLearned
        global frame
        self.total_trial += 1

        step = 0
        observation = self.env.reset()

        while True:
            step += 1
            frame += 1

            if self.thread_type == "train":
                action=self.agent.greedy_action(observation)
            elif self.thread_type == "test":
                self.env.render()
                sleep(0.01)
                action=self.agent.action(observation)
            
            next_observation,_,done,_ = self.env.step(action)

            reward = 0

            if done:
                if step >= 199:
                    reward = 1 #Zum Zeitpunkt des Erfolgs
                else:
                    reward =- 1 #Zum Zeitpunkt des Scheiterns
            else:
                #Wenn es nicht vorbei ist
                reward+=0
            
       #Speichern Sie das Ergebnis im Speicher
            self.memory.append([observation,action,reward,done,next_observation])

            observation = next_observation

            if done:
                break

     #Berechnen Sie die durchschnittliche Punktzahl von 10 Mal
        self.leaning_memory = np.hstack((self.leaning_memory[1:],step))
        print("Thread:",self.name," Thread_trials:",self.total_trial," score:",step," mean_score:",self.leaning_memory.mean()," total_step:",frame)

     #Am Ende des Lernens
        if self.leaning_memory.mean() >= 199:
            isLearned = True
            sleep(3)
        else:
            #Parameteraktualisierung
            self.agent.update(self.memory)
            self.memory = []

def main(args):
    #Prozess zum Erstellen eines Threads
    with tf.device("/cpu:0"):
        brain = ppo_brain()
        thread=[]
        for i in range(WORKER_NUM):
            thread_name = "local_thread"+str(i)
            thread.append(Worker(thread_type = "train",thread_name = thread_name,brain = brain))
    
    COORD = tf.train.Coordinator()
    SESS.run(tf.global_variables_initializer())
    saver = tf.train.Saver()
    
  #Laden Sie den vorherigen Trainingsprozess. Führen Sie dies grundsätzlich nach der Definition des Modells durch
    if args.load:
        ckpt = tf.train.get_checkpoint_state(MODEL_DIR)
        if ckpt:
            saver.restore(SESS,MODEL_SAVE_PATH)

    runnning_thread=[]
    for worker in thread:
        job = lambda: worker.run_thread()
        t = threading.Thread(target=job)
        t.start()
        runnning_thread.append(t)
    COORD.join(runnning_thread)
   
    #Machen Sie einen Test, wenn das Lernen vorbei ist
    test = Worker(thread_type = "test",thread_name = "test_thread",brain=brain)
    test.run_thread()

    if args.save:
        saver.save(SESS,MODEL_SAVE_PATH)
        print("saved")

if __name__=="__main__":
    SESS=tf.Session()
    frame=0
    isLearned=False
    
    main(args)

print("end")

Zusammenfassung

Deshalb habe ich dieses Mal PPO implementiert. Es scheint, dass ein Hauptmerkmal von PPO darin besteht, dass es trotz seines einfachen Mechanismus hohe Ergebnisse erzielt. Ich habe ein wenig über TRPO recherchiert, aber es schien ziemlich schwierig zu sein, damit zu arbeiten, deshalb habe ich diesmal eine ausführliche Erklärung weggelassen. Als nächstes möchte ich die Implementierung von PPO im kontinuierlichen Aktionsraum oder anderen Methoden zusammenfassen.

Recommended Posts

Ich habe versucht, PPO in Python zu implementieren
Ich habe versucht, Permutation in Python zu implementieren
Ich habe versucht, PLSA in Python 2 zu implementieren
Ich habe versucht, ADALINE in Python zu implementieren
Ich habe versucht, TOPIC MODEL in Python zu implementieren
Ich habe versucht, eine selektive Sortierung in Python zu implementieren
Ich habe versucht, einen Pseudo-Pachislot in Python zu implementieren
Ich habe versucht, Drakues Poker in Python zu implementieren
Ich habe versucht, GA (genetischer Algorithmus) in Python zu implementieren
Ich habe versucht, einen eindimensionalen Zellautomaten in Python zu implementieren
Ich habe versucht, die Mail-Sendefunktion in Python zu implementieren
Ich habe versucht, das Blackjack of Trump-Spiel mit Python zu implementieren
Ich habe versucht, PCANet zu implementieren
Ich habe versucht, ein missverstandenes Gefangenendilemma in Python zu implementieren
Ich habe versucht, StarGAN (1) zu implementieren.
Ich habe versucht, die Bayes'sche lineare Regression durch Gibbs-Sampling in Python zu implementieren
Ich habe versucht, Trumps Kartenspiel in Python zu implementieren
Ich habe versucht, die in Python installierten Pakete grafisch darzustellen
Ich möchte Timeout einfach in Python implementieren
Ich habe versucht, Mine Sweeper auf dem Terminal mit Python zu implementieren
Ich habe versucht zusammenzufassen, wie man Pandas von Python benutzt
Ich habe versucht, Deep VQE zu implementieren
Ich habe versucht, Python zu berühren (Installation)
Ich habe versucht, eine kontroverse Validierung zu implementieren
Ich habe versucht, Realness GAN zu implementieren
Ich habe Line Benachrichtigung in Python versucht
Ich habe versucht, ein scheinbar Windows-Snipper-Tool mit Python zu implementieren
Ich habe versucht, API list.csv mit Python aus swagger.yaml zu erstellen
Ich habe versucht "Wie man eine Methode in Python dekoriert"
Ich habe eine Stoppuhr mit tkinter mit Python gemacht
Ich habe versucht, Autoencoder mit TensorFlow zu implementieren
Python3-Standardeingabe habe ich versucht zusammenzufassen
Ich habe versucht, die Bayes'sche Optimierung von Python zu verwenden
Ich wollte ABC159 mit Python lösen
Ich habe versucht, CVAE mit PyTorch zu implementieren
[Python] Ich habe versucht, TF-IDF stetig zu berechnen
Ich habe versucht, Python zu berühren (grundlegende Syntax)
[Python] Ich habe versucht, eine stabile Sortierung zu implementieren
Ich habe Python> autopep8 ausprobiert
Implementieren Sie XENO mit Python
Ich habe versucht zu debuggen.
Implementieren Sie sum in Python
Ich habe Python> Decorator ausprobiert
Implementieren Sie Traceroute in Python 3
Ich habe versucht, das Lesen von Dataset mit PyTorch zu implementieren
Versuchen Sie, Oni Mai Tsuji Miserable mit Python zu implementieren
Python: Ich konnte in Lambda rekursieren
Ich möchte mit Python ein Fenster erstellen
Ich habe versucht, mit Python ein Tippspiel zu spielen
So implementieren Sie Shared Memory in Python (mmap.mmap)
Ich habe versucht, Keras in TFv1.1 zu integrieren
Ich habe versucht, "Birthday Paradox" mit Python zu simulieren
Geschrieben "Einführung in die Effektüberprüfung" in Python
Ich habe versucht, CloudWatch-Daten mit Python abzurufen
Ich habe versucht, LLVM IR mit Python auszugeben
Ich möchte verschachtelte Dicts in Python zusammenführen