我是靠谱客的博主 失眠绿草,最近开发中收集的这篇文章主要介绍02 强化学习——策略梯度法(PG)(连续动作)一、PG回顾二、连续动作PG算法网络三、代码实现四、训练结果,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、PG回顾

1、对于离散动作,策略搜索使用神经网络来参数化随机策略中的动作概率,网络的输入是智能体的当前状态,网络输出为当前所有动作的概率,该网络是一种分类网络。网络训练使用数据为一个episode数据(s,a,r). 参考https://blog.csdn.net/weixin_40493501/article/details/110384894

2、对于连续性动作来说,一般使用随机高斯策略,网络的输入是智能体当前状态,网络的输出的高斯策略的均值和标准差,网络是一个拟合网络。

无论是连续动作还是离散动作,在使用PG时,必须先弄清下面公式【主要推导上一个文章已给出】,离散动作和连续动作最大的不同就在于p(a|s,theta )

   在离散动作的网络设计中,输入是状态,输出可以看做是每个动作的概率,与动作标签进行对比,尽量使动作概率的输出接近标签。

   在连续动作中,将p(a|s,theta )看成是动作的分布,例如高斯分布

p(a|s) sim frac{1}{sqrt{2pi }}exp^{-frac{(a-mu)^{2}}{2delta ^{2}}}

二、连续动作PG算法网络

输入层为状态,连续动作空间的输出层不再是动作,二是动作描述的一种分布参数,利用输出的参数可以得到动作的分布,可以根据分布来选择动作。

【需要补充具体的计算过程】

 

三、代码实现

import tensorflow as tf
import numpy as np
import gym
import matplotlib.pyplot as plt
RENDER = False

'''
连续动作
使用的是高斯策略,用神经网络参数化高斯分布的均值和方差

状态空间为 2
动作空间为 1   其取值范围在[-2,2]之间
注意其误差的构建


'''

#利用当前策略进行采样,产生数据
class Sample():
    def __init__(self,env, policy_net):
        self.env = env
        self.policy_net=policy_net
        self.gamma = 0.95
    def sample_episodes(self, num_episodes):
        #产生num_episodes条轨迹
        batch_obs=[]
        batch_actions=[]
        batch_rs =[]
        for i in range(num_episodes):
            observation = self.env.reset()
            #将一个episode的回报存储起来
            reward_episode = []
            while True:
                # if RENDER:self.env.render()
                #根据策略网络产生一个动作
                state = np.reshape(observation,[1,3])
                action = self.policy_net.choose_action(state)
                observation_, reward, done, info = self.env.step(action)
                # print("observation",observation_)
                batch_obs.append(np.reshape(observation,[1,3])[0,:])
                # print('observation', np.reshape(observation,[1,3])[0,:])
                batch_actions.append(action)
                reward_episode.append((reward+8)/8)
                #一个episode结束
                if done:
                    #处理回报函数
                    reward_sum = 0
                    discouted_sum_reward = np.zeros_like(reward_episode)
                    for t in reversed(range(0, len(reward_episode))):
                        reward_sum = reward_sum*self.gamma + reward_episode[t]
                        discouted_sum_reward[t] = reward_sum
                    #归一化处理
                    discouted_sum_reward -= np.mean(discouted_sum_reward)
                    discouted_sum_reward/= np.std(discouted_sum_reward)
                    #将归一化的数据存储到批回报中
                    for t in range(len(reward_episode)):
                        batch_rs.append(discouted_sum_reward[t])
                        # print(discouted_sum_reward[t])
                    break
                #智能体往前推进一步
                observation = observation_
        #reshape 观测和回报
        batch_obs = np.reshape(batch_obs, [len(batch_obs), self.policy_net.n_features])
        batch_actions = np.reshape(batch_actions,[len(batch_actions),1])
        batch_rs = np.reshape(batch_rs,[len(batch_rs),1])
        return batch_obs, batch_actions,batch_rs
#定义策略网络
class Policy_Net():
    def __init__(self, env, action_bound, lr = 0.0001, model_file=None):
        self.learning_rate = lr
        #输入特征的维数
        self.n_features = env.observation_space.shape[0]
        #输出动作空间的维数
        self.n_actions = 1     # 注意动作维度只有一个
        #1.1 输入层
        self.obs = tf.placeholder(tf.float32, shape=[None, self.n_features])
        #1.2.第一层隐含层
        self.f1 = tf.layers.dense(inputs=self.obs, units=200, activation=tf.nn.relu, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),
                             bias_initializer=tf.constant_initializer(0.1))
        #1.3 第二层,均值,需要注意的是激活函数为tanh,使得输出在-1~+1
        mu = tf.layers.dense(inputs=self.f1, units=self.n_actions, activation=tf.nn.tanh, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),
                                       bias_initializer=tf.constant_initializer(0.1))
        #1.3 第二层,标准差
        sigma = tf.layers.dense(inputs=self.f1, units=self.n_actions, activation=tf.nn.softplus, kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.1),
                                       bias_initializer=tf.constant_initializer(0.1))
        #均值乘以2,使得均值取值范围在(-2,2)
        self.mu = 2*mu
        self.sigma =sigma
        # 定义带参数的正态分布
        self.normal_dist = tf.contrib.distributions.Normal(self.mu, self.sigma)
        #根据正态分布采样一个动作
        self.action = tf.clip_by_value(self.normal_dist.sample(1), action_bound[0],action_bound[1])
        #1.5 当前动作
        self.current_act = tf.placeholder(tf.float32, [None,1])
        self.current_reward = tf.placeholder(tf.float32, [None,1])
        #TODO 2. 构建损失函数
        log_prob = self.normal_dist.log_prob(self.current_act)
        self.loss = tf.reduce_mean(log_prob*self.current_reward+0.01*self.normal_dist.entropy())


        #3. 定义一个优化器
        self.train_op = tf.train.AdamOptimizer(self.learning_rate).minimize(-self.loss)
        #4. tf工程
        self.sess = tf.Session()
        #5. 初始化图中的变量
        self.sess.run(tf.global_variables_initializer())
        #6.定义保存和恢复模型
        self.saver = tf.train.Saver()
        if model_file is not None:
            self.restore_model(model_file)
    #依概率选择动作
    def choose_action(self, state):
        action = self.sess.run(self.action, {self.obs:state})
        return action[0]
    #定义训练
    def train_step(self, state_batch, label_batch, reward_batch):
        loss, _ =self.sess.run([self.loss, self.train_op], feed_dict={self.obs:state_batch, self.current_act:label_batch, self.current_reward:reward_batch})
        return loss
    #定义存储模型函数
    def save_model(self, model_path):
        self.saver.save(self.sess, model_path)
    #定义恢复模型函数
    def restore_model(self, model_path):
        self.saver.restore(self.sess, model_path)

def policy_train(env, brain, training_num):
    reward_sum = 0
    reward_sum_line = []
    training_time = []
    brain = brain
    env = env
    for i in range(training_num):
        temp = 0
        sampler = Sample(env, brain)
        # 采样1个episode
        train_obs, train_actions, train_rs = sampler.sample_episodes(1)
        brain.train_step(train_obs, train_actions, train_rs)
        if i == 0:
            reward_sum = policy_test(env, brain,RENDER,1)[0]
        else:
            reward_sum = 0.95 * reward_sum + 0.05 * policy_test(env, brain,RENDER,1)[0]
        # print(policy_test(env, brain))
        reward_sum_line.append(reward_sum)
        training_time.append(i)
        print("training episodes is %d,trained reward_sum is %f" % (i, reward_sum))
        if reward_sum > -200:
            break
    brain.save_model('./Pendulum_models/current_bset_pg_pendulum')
    plt.figure(1)
    plt.plot(training_time, reward_sum_line)
    plt.xlabel("training number")
    plt.ylabel("score")
    # plt.show()

def policy_test(env, policy,RENDER,test_number):
    test_reward = []
    for i in range(test_number):
        observation = env.reset()
        reward_sum = 0
        # 将一个episode的回报存储起来
        while True:
            if RENDER:
                env.render()
            # 根据策略网络产生一个动作
            state = np.reshape(observation, [1, 3])
            action = policy.choose_action(state)
            observation_, reward, done, info = env.step(action)
            reward_sum+=reward
            if done:
                break
            observation = observation_
        test_reward.append(reward_sum)

    return test_reward


    return reward_sum
if __name__=='__main__':
    #构建单摆类
    env_name = 'Pendulum-v0'
    env = gym.make(env_name)
    env.unwrapped
    env.seed(1)
    #定义力矩取值区间
    action_bound = [-env.action_space.high, env.action_space.high]
    #实例化一个策略网络
    brain = Policy_Net(env,action_bound)
    training_num = 200
    #训练策略网络
    policy_train(env, brain, training_num)
    #测试训练好的策略网络
    test_reward_sum = policy_test(env, brain, False, 20)

    plt.figure(2)
    plt.title("Test")
    plt.plot(test_reward_sum)
    plt.xlabel("Testing number")
    plt.ylabel("score")
    plt.show()

    plt.show()

 

四、训练结果

一共训练200个episodes,训练完成后在进行测试20个episode(一个episode即为一条轨迹)

测试结果:

问题:

1、从训练的回报函数来看,网络并没有收敛,并且不是训练次数的问题,测试的结果波动较大,算法没有收敛是算法本身的缺陷还是实现过程出错?

2、从实验画面显示来看,倒立摆没有立起来,如何在不改变算法的前提下来改进训练效果,即如何调节PG算法收敛

继续学习中。。。。。。

 

 

最后

以上就是失眠绿草为你收集整理的02 强化学习——策略梯度法(PG)(连续动作)一、PG回顾二、连续动作PG算法网络三、代码实现四、训练结果的全部内容,希望文章能够帮你解决02 强化学习——策略梯度法(PG)(连续动作)一、PG回顾二、连续动作PG算法网络三、代码实现四、训练结果所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部