《深度强化学习实践》Actor-Critic
- 算法
- 一、baseline
- 原理
- 代码
- 二、Actor-Critic
- 代码
- 参数部分
- 多个环境同时
- 网络
- the same of 下面连续的对应部分
- loss
- Value
- Policy
- entroy
- 三、连续动作Actor-Critic
- 代码
- 参数
- env
- agent
- 优化器
- batch和best_reward
- wtiter
- 获取样本
- 记录
- test
- batch数据解压
- loss
- vloss
- p loss
- e loss
- 小结
- 应用
算法
一、baseline
原理
策略梯度算法
目标:通过训练,增加好动作的概率,减小不好动作被采集到的概率。
实现:
▽
J
≈
E
[
Q
(
s
,
a
)
▽
log
π
(
a
∣
s
)
]
{triangledown }Japprox E[Q(s,a){triangledown }log pi (a|s)]
▽J≈E[Q(s,a)▽logπ(a∣s)]
缺点:不稳定,收敛速度慢的缺点(因此actor-critic算法致力于解决这两个问题。)
对于稳定这个问题,从数学上,可以通过减小梯度的方差来实现。
在Reinforce方法中,使用贴现的总奖励作为策略梯度中关于net梯度前的系数。
下面主要指更新曲折。
对于一个有3种可以选择的动作的状态而言。
在情况一中,3个动作的Q值有两个正一个负,则一定会在前两个正的动作中进行选择;
在情况二中,3个动作的Q值三个都是正的,但是第三个动作的Q值特别小,但是在这种情况下,这个动作不可避免会被选到。
为了尽可能降低这种状况的影响,我们可以采取采样多个样本来应对这种情况;同时我们可以采用一种Q-Learning中提到的baseline的方法。
通过采用baseline的方法
1)、既可以产生有区分的动作,
2)、同时也可以减小训练过程中多次实验带来的方差
从而达到稳定的目的。
但是要注意,这里所说的baseline,是与状态无关的。
代码
代码如下
1、获取参数
1
2
3
4
5
6
7
8
9
10GAMMA = 0.99 LEARNING_RATE = 0.001 ENTROPY_BETA = 0.01 BATCH_SIZE = 8 REWARD_STEPS = 10 if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--baseline", default=False, action='store_true', help="Enable mean baseline") args = parser.parse_args()
2、创建环境
1
2env = gym.make("CartPole-v0")
3、记录器
1
2writer = SummaryWriter(comment="-cartpole-pg" + "-baseline=%s" % args.baseline)
4、定义网络
1
2
3net = PGN(env.observation_space.shape[0], env.action_space.n) print(net)
其中net为
class PGN(nn.Module):
def init(self, input_size, n_actions):
super(PGN, self).init()
self.net = nn.Sequential(
nn.Linear(input_size, 128),
nn.ReLU(),
nn.Linear(128, n_actions)
)
def forward(self, x):
return self.net(x)
5、定义行动者
1
2
3
4agent = ptan.agent.PolicyAgent(net, preprocessor=ptan.agent.float32_preprocessor, apply_softmax=True)
这里采用ptan中的打包好的策略智能体。
ptan.agent.float32_preprocessor表示数据格式时float32的tensor,使用softmax,默认根据概率分布选择动作。
关于softmax中维度的选择,参见链接,默认axis=1,就是得到行中最大值的索引。
6、定义“经验收集装置”
1
2exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, gamma=GAMMA, steps_count=REWARD_STEPS)
定义好之后,可以通过之后的for循环不断获取一个episode的值,其中每个episode中步数为REWARD_STEPS。
经验收集有两个函数可以实现:
ExperienceSource、ExperienceSourceFirstLast
第一个函数返回每一步的s,a,r;
第二个函数返回头状态,头动作,尾状态以及总奖励(贴现)。这里我们选择的就是第二种。
还需要注意的是:使用ExperienceSource的每个episode中我们设置了一个固定的步数。所以在一个episode中,可能并没有跑完整个任务,因此当执行下一个episode时,起始状态就是上一个episode的最后状态???
7、定义优化器
1
2optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
8、定义各种变量
1
2
3
4
5
6
7total_rewards = [] #记录一幕的总奖励,这里指的是一幕中共进行的episode(这里指定10步为一个episode)数. step_idx = 0 #记录当前是第几个episode done_episodes = 0 #记录完整的一幕的个数 reward_sum = 0.0 #以每次的episode为一个单位,累加 batch_states, batch_actions, batch_scales = [], [], []
9、数据收集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38##############每走10步收集一次数据##################### for step_idx, exp in enumerate(exp_source): ##一直累加总奖励 reward_sum += exp.reward ##这里的baseline是平均奖励,与状态无关。 baseline = reward_sum / (step_idx + 1) writer.add_scalar("baseline", baseline, step_idx) ##将这10步中的状态、动作作为batch中的一条append进去 batch_states.append(exp.state) batch_actions.append(int(exp.action)) ##!!!!!当baseline不是0的时候,就将此10步中每一步都减去baseline的差值作为系数 if args.baseline: batch_scales.append(exp.reward - baseline) else: batch_scales.append(exp.reward) ##############当一幕结束时,会返回一幕中总共运行的episode数,以及最后100个episode数 # handle new rewards new_rewards = exp_source.pop_total_rewards() if new_rewards: done_episodes += 1 reward = new_rewards[0] total_rewards.append(reward) mean_rewards = float(np.mean(total_rewards[-100:])) print("%d: reward: %6.2f, mean_100: %6.2f, episodes: %d" % ( step_idx, reward, mean_rewards, done_episodes)) writer.add_scalar("reward", reward, step_idx) writer.add_scalar("reward_100", mean_rewards, step_idx) writer.add_scalar("episodes", done_episodes, step_idx) if mean_rewards > 195: print("Solved in %d steps and %d episodes!" % (step_idx, done_episodes)) break ###############当记录的数据不满BATCH_SIZE也就是8的时候,先不计算损失,等前几个状态过去以后,之后每加一个数据都会计算损失以及更新参数。 if len(batch_states) < BATCH_SIZE: continue states_v = torch.FloatTensor(batch_states) batch_actions_t = torch.LongTensor(batch_actions) batch_scale_v = torch.FloatTensor(batch_scales)
这里有几件事我们需要清楚:
1、CartPole-v0游戏奖励设定:当杆的角度大于15度的时候游戏就会终止,为了获得更多的奖励,需要动态调节小车的前进方向,以使agent可以较长时间获得奖励;
2、ExperienceSourceFirstLast虽然得到的只有初始和结束时的状态以及初始动作值,看源码实现以及print可以发现,其每一条记录(即10步)的初始状态,都是上一条记录中的第二条记录。也就是说,在电脑中游戏是一步一步运行,但是记录的确实隔一步记录此后的10步。
3、 由实验以及,以上分析可以得到exp_source.pop_total_rewards()得到的,其实是总共运行的步数。
可以通过在experiencesourcefirstlast的__iter__中print(exp)得到。
4、而exp.reward()是每一次获得的带贴现10步的奖励,这个reward才是真正训练用得到的奖励,total_rewards仅仅是为了展示总奖励以及方差等特征。
5、由于batch_value/batch_action中每次append的都是experiencesourcefirstlast中得到的初始状态以及初始动作,也就是说都是一个;又因为在后面if len(batch_states) < BATCH_SIZE: continue
以及batch_states.clear() batch_actions.clear() batch_scales.clear()
使得每次数据到达batch以后就会执行训练以及清空。
10、计算损失、优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19states_v = torch.FloatTensor(batch_states) batch_actions_t = torch.LongTensor(batch_actions) batch_scale_v = torch.FloatTensor(batch_scales) optimizer.zero_grad() logits_v = net(states_v) log_prob_v = F.log_softmax(logits_v, dim=1) log_prob_actions_v = batch_scale_v * log_prob_v[range(BATCH_SIZE), batch_actions_t] loss_policy_v = -log_prob_actions_v.mean() loss_policy_v.backward(retain_graph=True) grads = np.concatenate([p.grad.data.numpy().flatten() for p in net.parameters() if p.grad is not None]) prob_v = F.softmax(logits_v, dim=1) entropy_v = -(prob_v * log_prob_v).sum(dim=1).mean() entropy_loss_v = -ENTROPY_BETA * entropy_v entropy_loss_v.backward() optimizer.step() loss_v = loss_policy_v + entropy_loss_v
首先是转换为tensor,优化器梯度清空。
然后是计算两个损失的梯度,分别是策略梯度损失和熵损失。
(1)、策略梯度损失:
首先通过网络得到二维数据,分别是样本数*动作数,即8*2;
然后对于每一个样本,也就是对于动作取logsoftmax,得到对于每个动作的log概率分布;
其次将(每次的10步贴现奖励-baseline)作为系数,乘以对应动作的log概率得到8条数据;
最后做了期望处理,以及最大化奖励变最小化损失后,加负号。
注意!这里其实求的是损失,但是由于我们计算的梯度确实与前面的R、b无关,所以可以将公式写作:
▽
J
1
≈
−
E
[
(
R
−
b
)
▽
log
π
(
a
∣
s
)
]
{triangledown }J1approx -E[(R-b){triangledown }log pi (a|s)]
▽J1≈−E[(R−b)▽logπ(a∣s)]
这里的关于python的技巧有:
1、梯度回传以后,防止变量的图结构丢失,通俗来讲就是先把梯度保存,等下次回传完熵损失以后一起进行优化
2、flatten()可以使numpy对象返回一维数组;np.concatenate实现对一维数组的拼接。
(2)、熵损失
其最终实现的公式为
▽
J
2
≈
E
[
β
▽
(
π
(
s
)
∗
l
o
g
π
(
s
)
)
]
{triangledown }J2approx E[{{beta }triangledown }(pi (s)*log pi (s))]
▽J2≈E[β▽(π(s)∗logπ(s))]因为熵本身前面有负号,这样最大化熵,也就是最小化正的这个公式。
最大熵公式可以保证在探索的时候尽可能随机,而不是快速收敛到局部最优。
最后一部分就是执行优化参数。
11、计算参数更新前后的相对熵(KL散度)
1
2
3
4
5
6new_logits_v = net(states_v) new_prob_v = F.softmax(new_logits_v, dim=1) kl_div_v = -((new_prob_v / prob_v).log() * prob_v).sum(dim=1).mean() writer.add_scalar("kl", kl_div_v.item(), step_idx)
如题,用来验证更新前后两个分布之间的相似程度。
关于熵的这部分:详见强化学习中的基础数学知识
12、tensorboardX
1
2
3
4
5
6
7
8
9
10
11
12writer.add_scalar("kl", kl_div_v.item(), step_idx) writer.add_scalar("baseline", baseline, step_idx) writer.add_scalar("entropy", entropy_v.item(), step_idx) writer.add_scalar("batch_scales", np.mean(batch_scales), step_idx) writer.add_scalar("loss_entropy", entropy_loss_v.item(), step_idx) writer.add_scalar("loss_policy", loss_policy_v.item(), step_idx) writer.add_scalar("loss_total", loss_v.item(), step_idx) writer.add_scalar("grad_l2", np.sqrt(np.mean(np.square(grads))), step_idx) writer.add_scalar("grad_max", np.max(np.abs(grads)), step_idx) writer.add_scalar("grad_var", np.var(grads), step_idx)
值得关注的是
前面数据收集中提到的记录的一幕episode的R以及最后100幕R的平均;
以及梯度方差。
二、Actor-Critic
代码
参数部分
GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01
BATCH_SIZE = 128
NUM_ENVS = 50
REWARD_STEPS = 4
CLIP_GRAD = 0.1
这里的CLIP_GRAD用来当做梯度L2范数的阈值,步数在PG中10步,但是这里用了未来的值函数,即TD,所以不用那么多。
1
2
3
4
5
6
7parser = argparse.ArgumentParser() parser.add_argument("--cuda", default=False, action="store_true", help="Enable cuda") parser.add_argument("-n", "--name", required=True, help="Name of the run") args = parser.parse_args() device = torch.device("cuda" if args.cuda else "cpu")
这部分一样。
多个环境同时
1
2
3
4
5make_env = lambda: ptan.common.wrappers.wrap_dqn(gym.make("PongNoFrameskip-v4")) envs = [make_env() for _ in range(NUM_ENVS)] writer = SummaryWriter(comment="-pong-a2c_" + args.name)
这里我感觉也不需要太纠结,环境到底是怎么定义的,只需要记住这种方法既可以,对于wrap_dqn可以看看,然而看了以后还是看不太懂,等前面看dqn再说,感觉就是把最原始的环境按照自己的想法包装再包装。下面就是这个函数,这一些分别是前一个环境作为参数作为父类,传入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def wrap_dqn(env, stack_frames=4, episodic_life=True, reward_clipping=True): """Apply a common set of wrappers for Atari games.""" assert 'NoFrameskip' in env.spec.id if episodic_life: env = EpisodicLifeEnv(env) env = NoopResetEnv(env, noop_max=30) env = MaxAndSkipEnv(env, skip=4) if 'FIRE' in env.unwrapped.get_action_meanings(): env = FireResetEnv(env) env = ProcessFrame84(env) env = ImageToPyTorch(env) env = FrameStack(env, stack_frames) if reward_clipping: env = ClippedRewardsWrapper(env) return env
网络
下面这个是网络模型,因为这个传入的是图片,所以需要用到卷积。
首先是三层卷积,分别用Relu函数激活,就是body部分,后面是策略函数和值函数,他们都公用面卷积的部分。这两个函数的输入是一个数字,代表输入数据的维度,其中使用np.prod将多维的tensor转为所有维度相乘的一个数字。这是作为网络结构的部分,作为真实的传入数据,是通过归一化,然后view(fx.size()[0], -1)将得到的batchsize行的一维数据,这里fx.size()的格式是(batchsize,channels,x,y)。
这里的策略函数输出的是动作的序号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30class AtariA2C(nn.Module): def __init__(self, input_shape, n_actions): super(AtariA2C, self).__init__() self.conv = nn.Sequential( nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4), nn.ReLU(), nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(), nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU() ) conv_out_size = self._get_conv_out(input_shape) self.policy = nn.Sequential( nn.Linear(conv_out_size, 512), nn.ReLU(), nn.Linear(512, n_actions) ) self.value = nn.Sequential( nn.Linear(conv_out_size, 512), nn.ReLU(), nn.Linear(512, 1) ) def _get_conv_out(self, shape): o = self.conv(torch.zeros(1, *shape)) return int(np.prod(o.size())) def forward(self, x): fx = x.float() / 256 conv_out = self.conv(fx).view(fx.size()[0], -1) return self.policy(conv_out), self.value(conv_out)
the same of 下面连续的对应部分
这一部分的一个新的发现可能是关于with里的这个if tracker.reward(new_rewards[0], step_idx): break
,还有就是with也是顺序进行的,当进行到最后就退出了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18exp_source = ptan.experience.ExperienceSourceFirstLast(envs, agent, gamma=GAMMA, steps_count=REWARD_STEPS) optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE, eps=1e-3) batch = [] with common.RewardTracker(writer, stop_reward=18) as tracker: with ptan.common.utils.TBMeanTracker(writer, batch_size=10) as tb_tracker: for step_idx, exp in enumerate(exp_source): batch.append(exp) # handle new rewards new_rewards = exp_source.pop_total_rewards() if new_rewards: if tracker.reward(new_rewards[0], step_idx): break if len(batch) < BATCH_SIZE: continue states_v, actions_t, vals_ref_v = unpack_batch(batch, net, device=device) batch.clear()
loss
Value
根据前面的分析,第二个是值函数,计算的均方误差损失就可以得到值函数的损失了。
1
2
3
4
5optimizer.zero_grad() logits_v, value_v = net(states_v) loss_value_v = F.mse_loss(value_v.squeeze(-1), vals_ref_v)
Policy
这个地方和前面baseline的方法一样
1
2
3
4
5
6log_prob_v = F.log_softmax(logits_v, dim=1) adv_v = vals_ref_v - value_v.squeeze(-1).detach() log_prob_actions_v = adv_v * log_prob_v[range(BATCH_SIZE), actions_t] loss_policy_v = -log_prob_actions_v.mean()
entroy
1
2
3
4prob_v = F.softmax(logits_v, dim=1) entropy_loss_v = ENTROPY_BETA * (prob_v * log_prob_v).sum(dim=1).mean()
三、连续动作Actor-Critic
代码
参数
一样的首先是关于参数的设置。默认关闭cuda,如果需要开启,就在终端运行的时候加上–cuda,就会因为store_true将args.cuda设置为True。-n这个参数是必须需要的,因为有required选项。
1
2
3
4
5
6
7
8
9
10
11
12#定义结构 parser = argparse.ArgumentParser() #设置需要的参数 parser.add_argument("--cuda", default=False, action='store_true', help='Enable CUDA') parser.add_argument("-n", "--name", required=True, help="Name of the run") #实例化 args = parser.parse_args() #设置运行的设备,新建文件夹 device = torch.device("cuda" if args.cuda else "cpu") save_path = os.path.join("saves", "a2c-" + args.name) os.makedirs(save_path, exist_ok=True)
env
设置两个一样的环境,用来一个用来产生数据,一个用来测试模型。
1
2
3
4env = gym.make(ENV_ID) test_env = gym.make(ENV_ID)
agent
1
2
3
4
5net = model.ModelA2C(env.observation_space.shape[0], env.action_space.shape[0]).to(device) agent = model.AgentA2C(net, device=device) exp_source = ptan.experience.ExperienceSourceFirstLast(env, agent, GAMMA, steps_count=REWARD_STEPS)
1、其中的net就是agent的核心部分,输入是状态,输出是动作的均值,方差,以及对于输入状态值函数的估计。其中的.to(device)的作用是将所有最开始读取数据时的tensor变量copy一份到device所指定的GPU上去,之后的运算都在GPU上进行。
关于策略函数
(1)之前离散状态的时候是一般只一个可以选择的动作,且这个动作有几个可以选择的值,每个值都有一定概率被选择;
(2)对于连续动作控制来说,一般是多个可以选择的动作,且每个动作的都有一个可以取值的区间。
如果我们只用网络的输出作为选择的动作,那不利于探索,为了增加随机性的成分。我们选择输出正太分布的均值和方差来描述值的概率分布。
关于值函数
值函数就是在公用网络部分self.base输出之后,再接一层,输出一个scale。
(不知为啥,两个__中间的部分被加粗了)
class ModelA2C(nn.Module):
def init(self, obs_size, act_size):
super(ModelA2C, self).init()
self.base = nn.Sequential(
nn.Linear(obs_size, HID_SIZE),
nn.ReLU(),
)
self.mu = nn.Sequential(
nn.Linear(HID_SIZE, act_size),
nn.Tanh(),
)
tanh使得均值取值在-1到1之间,这里因为我们将电机的取值也量化在0-1之间
self.var = nn.Sequential(
nn.Linear(HID_SIZE, act_size),
nn.Softplus(),
)
softplus可以看做正的平滑的relu函数
self.value = nn.Linear(HID_SIZE, 1)
直接输出作为值函数
def forward(self, x):
base_out = self.base(x)
return self.mu(base_out), self.var(base_out), self.value(base_out)
2、调用agent类
1
2
3
4
5
6
7
8
9
10
11
12
13class AgentA2C(ptan.agent.BaseAgent): def __init__(self, net, device="cpu"): self.net = net self.device = device def __call__(self, states, agent_states): states_v = ptan.agent.float32_preprocessor(states).to(self.device) mu_v, var_v, _ = self.net(states_v) mu = mu_v.data.cpu().numpy() sigma = torch.sqrt(var_v).data.cpu().numpy() actions = np.random.normal(mu, sigma) actions = np.clip(actions, -1, 1) return actions, agent_states
可以看到agent类继承了父类BaseAgent。同时调用的时候,首先给赋给他之前定义的网络,以及网络需要运行的设备。这里再明确设备是因为,在定义网络的时候会定义网络运行的设备,同时呢,我们的数据也必须在那个设备上,因此有了states_v = ptan.agent.float32_preprocessor(states).to(self.device)
将得到的mu,和开方后的方差,也就是标准差移动到cpu,使用numpy进行运算采样(因为numpy只能运行在cpu),并限制区间,得到最终的action。
3、定义迭代器
这个函数一节讲过,但是当时理解不是很深入,于是再来重新补充下。
(1)我们知道这个迭代器输出的是一条从当前状态走REWARD_STEPS步过程中的记录;
(2)这个迭代器每次返回的是“一条数据”;
(3)因为可能遇到终止状态,因此每条这样的样本中包含的实际样本数可能小于REWARD_STEPS;
(4)这个迭代器其实是具有超前视角的,就是说虽然它能提前感知到结束,但是它实际中每次只走一步,所以对于last_states而言,会有REWARD_STEPS个None
(5)还有一个重要的点,是关于这个pop_total_rewards或者pop_rewards_steps函数输出的reward,它输出的reward的来源是在class ExperienceSource的迭代中,这里面的总奖励是不贴现的,所以也可以代表步数。当然这个步数,也就是最终得到的样本数。因为像前面说的,实际中还是走了一步。因此呢,还是得等到没有实际中没有下一步可以走的时候,就会输出这个结果。
优化器
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE)
这个没什么好说的
batch和best_reward
1
2
3
4batch = [] best_reward = None
batch用来装样本,best_reward用来放目前最有的奖励,用来判断是否保存样本。
wtiter
1
2
3
4writer = SummaryWriter(comment="-a2c_" + args.name) with ptan.common.utils.RewardTracker(writer) as tracker: with ptan.common.utils.TBMeanTracker(writer, batch_size=10) as tb_tracker:
首先是定义一个可以用来记录的tensorboard的记录器
1、第一句话是命名一个RewardTracker实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37class RewardTracker: def __init__(self, writer, min_ts_diff=1.0): """ Constructs RewardTracker :param writer: writer to use for writing stats :param min_ts_diff: minimal time difference to track speed """ self.writer = writer self.min_ts_diff = min_ts_diff def __enter__(self): self.ts = time.time() self.ts_frame = 0 self.total_rewards = [] return self def __exit__(self, *args): self.writer.close() def reward(self, reward, frame, epsilon=None): self.total_rewards.append(reward) mean_reward = np.mean(self.total_rewards[-100:]) ts_diff = time.time() - self.ts if ts_diff > self.min_ts_diff: #这里因为是当一个回合结束时才能得到frame,而这个frame是总共的步数,不会因为done清0,所以这个差值就是主函数中的steps speed = (frame - self.ts_frame) / ts_diff self.ts_frame = frame self.ts = time.time() epsilon_str = "" if epsilon is None else ", eps %.2f" % epsilon print("%d: done %d episodes, mean reward %.3f, speed %.2f f/s%s" % ( frame, len(self.total_rewards), mean_reward, speed, epsilon_str )) sys.stdout.flush() self.writer.add_scalar("speed", speed, frame) if epsilon is not None: self.writer.add_scalar("epsilon", epsilon, frame) self.writer.add_scalar("reward_100", mean_reward, frame) self.writer.add_scalar("reward", reward, frame) return mean_reward if len(self.total_rewards) > 30 else None
作为with进来的,需要有__enter__,进来就会执行,走的时候就__exit__。
这里重点来看这个reward函数。
传进来的参数从后面的代码(在下面)一节来看,有这一个回合的总建立,以及总共进行的步数。
(1)对于奖励的部分
首先添加总奖励
然后取最后100回合的平均总奖励,并记录
记录当前回合的奖励
(2)速度
这个地方比较迷
至于这个速度的表达为:
这个速度是当前这一回合步数(因为在主函数中的序号是不会停的),除以这一回合的所用的时间。
不太清楚这个值到底代表什么???
2、第二句话
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42class TBMeanTracker: """ TensorBoard value tracker: allows to batch fixed amount of historical values and write their mean into TB Designed and tested with pytorch-tensorboard in mind """ def __init__(self, writer, batch_size): """ :param writer: writer with close() and add_scalar() methods :param batch_size: integer size of batch to track """ assert isinstance(batch_size, int) assert writer is not None self.writer = writer self.batch_size = batch_size def __enter__(self): self._batches = collections.defaultdict(list) return self def __exit__(self, exc_type, exc_val, exc_tb): self.writer.close() @staticmethod def _as_float(value): assert isinstance(value, (float, int, np.ndarray, np.generic, torch.autograd.Variable)) or torch.is_tensor(value) tensor_val = None if isinstance(value, torch.autograd.Variable): tensor_val = value.data elif torch.is_tensor(value): tensor_val = value if tensor_val is not None: return tensor_val.float().mean().item() elif isinstance(value, np.ndarray): return float(np.mean(value)) else: return float(value) def track(self, param_name, value, iter_index): assert isinstance(param_name, str) assert isinstance(iter_index, int) data = self._batches[param_name] data.append(self._as_float(value)) if len(data) >= self.batch_size: self.writer.add_scalar(param_name, np.mean(data), iter_index) data.clear()
首先判断batch_size类型是否是int,writer如果不是None则通过。
接下来生成一个默认的字典,值类型是list。同时注意到这里的return self。这个语句可以用来返回实例自身,可以用于循环调用方法。见链接python中 return self的作用
_as_float是一个类内定义的函数,提供下面函数track使用。
torch.autograd.Variable是可以自动计算梯度的变量
torch.is_tensor判断是否是是否是tensor
这里首先判断是不是tensor类型,如果是并且非空,就取均值。如果是numpy类型,也取平均。这也就暗示输入的步数确实不是一个。
这里输出了float类型的值以后,首先以传入参数为键,定义空列表,然后将刚得到的float赋值给他。最终记录的是达到batch_size以后的一个回合平均步数,横坐标是总步数。
这里有个很奇怪的现象,就是上面提到的,输入的一个回合的步数确实不是一个值,可能是2或者3个或者一个,而且对应的总奖励确实也不是一个。这就很让人疑惑。
握草!!!要哭惹!!!
再次提醒,外置硬盘系统一定不要移动硬盘,要不它会悄悄掉线,然后将你写的csdn抹杀。。。。。。。。。。。。。
从来亿遍!!!
就是哈,刚才我在这个地方写了个,可把我牛逼坏了,叉会腰,是怎么回事呢?
就是我在reward函数中加了个标志位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24def reward(self, reward, frame, epsilon=None): self.total_rewards.append(reward) mean_reward = np.mean(self.total_rewards[-100:]) ts_diff = time.time() - self.ts if ts_diff > self.min_ts_diff: #这里因为是当一个回合结束时才能得到frame,而这个frame是总共的步数,不会因为done清0,所以这个差值就是主函数中的steps speed = (frame - self.ts_frame) / ts_diff self.ts_frame = frame self.ts = time.time() epsilon_str = "" if epsilon is None else ", eps %.2f" % epsilon print("%d: done %d episodes, mean reward %.3f, speed %.2f f/s%s" % ( frame, len(self.total_rewards), mean_reward, speed, epsilon_str )) sys.stdout.flush() self.writer.add_scalar("speed", speed, frame) else: print("恭喜找到bug") if epsilon is not None: self.writer.add_scalar("epsilon", epsilon, frame) self.writer.add_scalar("reward_100", mean_reward, frame) self.writer.add_scalar("reward", reward, frame) return mean_reward if len(self.total_rewards) > 30 else None
然后结果是这样
就是说原因是因为时间没有达到要求,因此没有出发输出函数。
第二个叉会腰呢是因为,顺藤摸瓜,我发现之前的平均步数的函数的理解也是不对的,也是标志位:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19def _as_float(value): # print('dfsdfdsf',value) assert isinstance(value, (float, int, np.ndarray, np.generic, torch.autograd.Variable)) or torch.is_tensor(value) tensor_val = None if isinstance(value, torch.autograd.Variable): tensor_val = value.data elif torch.is_tensor(value): tensor_val = value if tensor_val is not None: print('eee',tensor_val.float().mean().item()) return tensor_val.float().mean().item() elif isinstance(value, np.ndarray): print('rrrrr') return float(np.mean(value)) else: print('ttttt') return float(value)
得到下图
同样的东西写两遍真是恶心。。。
不多说了,自己看。
不对,还有个总结
-------------总结------------
总的来说
首先定义奖励函数,用来记录当前奖励以及平均奖励
然后定义步数函数,用来记录一个batch样本中的平均回合步数
获取样本
就是说我们获取的是一个包含起始状态与2步以后的状态,以及动作,奖励的数据,与是否是终止状态无关,或者说没有太大关系,唯一的关系是,可能一个样本记录的中间步数少于2步,当在接近终止状态时。
1
2for step_idx, exp in enumerate(exp_source):
从迭代器中取出一条上面介绍的样本。其中包括样本的序号(从不清0),以及样本数据。
记录
这就用到上面介绍的记录器了
1
2
3
4
5
6
7
8rewards_steps = exp_source.pop_rewards_steps() if rewards_steps: #zip多个可迭代对象的对应位置元素打包组成元祖对象,list(zip)可以转为数组,zip(*list(zip))可以将可迭代对象进行解压成多个列表 rewards, steps = zip(*rewards_steps) tb_tracker.track("episode_steps", steps[0], step_idx) tracker.reward(rewards[0], step_idx)
这里弹出的是回合奖励和回合步数的打包成的元祖对象,得到返回之后,首先进行解包,然后将当前奖励和(回合数、总步数)发给记录器进行记录。
test
这一段是测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17if step_idx % TEST_ITERS == 0: ts = time.time() rewards, steps = test_net(net, test_env, device=device) print("Test done is %.2f sec, reward %.3f, steps %d" % ( time.time() - ts, rewards, steps)) writer.add_scalar("test_reward", rewards, step_idx) writer.add_scalar("test_steps", steps, step_idx) if best_reward is None or best_reward < rewards: if best_reward is not None: print("Best reward updated: %.3f -> %.3f" % (best_reward, rewards)) name = "best_%+.3f_%d.dat" % (rewards, step_idx) fname = os.path.join(save_path, name) #这里是保存的模型的参数,到指定的.bat文件 torch.save(net.state_dict(), fname) best_reward = rewards
其中测试网络如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17def test_net(net, env, count=10, device="cpu"): rewards = 0.0 steps = 0 for _ in range(count): obs = env.reset() while True: obs_v = ptan.agent.float32_preprocessor([obs]).to(device) mu_v = net(obs_v)[0] action = mu_v.squeeze(dim=0).data.cpu().numpy() action = np.clip(action, -1, 1) obs, reward, done, _ = env.step(action) rewards += reward steps += 1 if done: break return rewards / count, steps / count
这里是在cpu上定义了
哎wc,我要枯了啊,我昨晚写的又没保存。。。r哦
哎,真心枯了。。。
再来亿遍
以上是测试网络,网路里直接将均值作为动作。当平均奖励大于最好的,就更新。
batch数据解压
1
2
3
4
5states_v, actions_v, vals_ref_v = common.unpack_batch_a2c(batch, net, last_val_gamma=GAMMA ** REWARD_STEPS, device=device) batch.clear()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36def unpack_batch_a2c(batch, net, last_val_gamma, device="cpu"): """ Convert batch into training tensors :param batch: :param net: :return: states variable, actions tensor, reference values variable """ states = [] actions = [] rewards = [] not_done_idx = [] last_states = [] for idx, exp in enumerate(batch): states.append(exp.state) actions.append(exp.action) rewards.append(exp.reward) #当最后一个状态 if exp.last_state is not None: not_done_idx.append(idx) last_states.append(exp.last_state) states_v = ptan.agent.float32_preprocessor(states).to(device) actions_v = torch.FloatTensor(actions).to(device) print(not_done_idx) # handle rewards rewards_np = np.array(rewards, dtype=np.float32) if not_done_idx: last_states_v = ptan.agent.float32_preprocessor(last_states).to(device) #得到网络中输出的当前状态的值函数 last_vals_v = net(last_states_v)[2] #只要二维数组的第一个维度的值 last_vals_np = last_vals_v.data.cpu().numpy()[:, 0] #得到初始状态的值函数V(s)=r+yr+yyV(s+1) rewards_np[not_done_idx] += last_val_gamma * last_vals_np ref_vals_v = torch.FloatTensor(rewards_np).to(device) return states_v, actions_v, ref_vals_v
这个地方我当时还测试了一下,确实就是如我所料,就是如下图所示,选5步的时候,中间5个没有。
还有就是几个需要注意的地方:
(1)vs = r+yr+yyvs+1…
(2)有的是ptan.agent.float32_preprocessor(last_states).to(device)
有的直接torch.FloatTensor(actions).to(device),看源码,两者的区别就是前者首先将数据变为ndarry再tensor。
(3)输出为s,a,vs(真)
loss
vloss
1
2
3
4
5optimizer.zero_grad() mu_v, var_v, value_v = net(states_v) loss_value_v = F.mse_loss(value_v.squeeze(-1), vals_ref_v)
就是显然的均方误差损失,真实值用的是TD。
p loss
1
2
3
4
5adv_v = vals_ref_v.unsqueeze(dim=-1) - value_v.detach() log_prob_v = adv_v * calc_logprob(mu_v, var_v, actions_v) loss_policy_v = -log_prob_v.mean()
其中calc_logprob为
1
2
3
4
5def calc_logprob(mu_v, var_v, actions_v): p1 = - ((mu_v - actions_v) ** 2) / (2*var_v.clamp(min=1e-3)) p2 = - torch.log(torch.sqrt(2 * math.pi * var_v)) return p1 + p2
就是正常的的最大化的过程,后面在p loss中再乘以系数,并最小化损失。
e loss
1
2entropy_loss_v = ENTROPY_BETA * (-(torch.log(2*math.pi*var_v) + 1)/2).mean()
然后最后就是加起来优化。
终于结束了,,,闹心。接下来才是第二部分Actor-Critic,我是跳着看的。。。
小结
应用
最后
以上就是重要信封最近收集整理的关于《深度强化学习实践》Actor-Critic算法小结应用的全部内容,更多相关《深度强化学习实践》Actor-Critic算法小结应用内容请搜索靠谱客的其他文章。
发表评论 取消回复