强化学习介绍

根据不同的分列方法可以将强化学习算法分成不同的种类:
1.基于概率(policy-based)和基于价值(value-based)

基于概率是强化学习中最直接的一种, 他能通过感官分析所处的环境, 直接输出下一步要采取的各种动作的概率, 然后根据概率采取行动, 所以每种动作都有可能被选中, 只是可能性不同. 而基于价值的方法输出则是所有动作的价值, 我们会根据最高价值来选着动作, 相比基于概率的方法, 基于价值的决策部分更为铁定, 毫不留情, 就选价值最高的, 而基于概率的, 即使某个动作的概率最高, 但是还是不一定会选到他.

image-20210312210405206

其中policy-based中的典型算法有Policy Gradients,value-based的典型算法有Q-learning、SARSA、DQN,两者重合的典型模型有AC、A2C、A3C

2.在线学习(on-policy)和离线学习(off-policy)

所谓在线学习(on-policy), 就是指我必须本人在场, 并且一定是本人边玩边学习, 学习者与环境必须产生实际的交互。

而离线学习(off-policy)是你可以选择自己玩, 也可以选择看着别人玩, 通过看别人玩来学习别人的行为准则。

on-policy的典型算法是SARSA, off-policy的典型算法是Q-learning、DQN。

on-policy是的学习者必学进行完一系列实际动作后才能产生样本,这样效率往往较慢。off-policy可以从以往的经验或别人的动作开学习,效率往往比较高。

image-20210312210627505

3.model-based和model-free
可以将所有强化学习的方法分为理不理解所处环境,如果我们不尝试去理解环境, 环境给了我们什么就是什么. 我们就把这种方法叫做 model-free, 这里的 model 就是用模型来表示环境, 那理解了环境也就是学会了用一个模型来代表环境, 所以这种就是 model-based 方法.

image-20210312210824998

SARSA算法原理

强化学习的主要功能就是让agent学习尽可能好的动作action,使其后续获得的奖励尽可能的大。

假设在时刻t时,处于状态长期奖励为:
在这里插入图片描述

image-20210312211035990image-20210312211102633

image-20210312213327171

image-20210312213401080

image-20210312213418619

二、SARSA代码

此处直接参考莫烦python的强化学习教程进行代码编写,在基础上说明每一行代码的用途

1.environment的编写
首先RL需要一个环境,因为我们控制不了环境(比如下围棋时我们不不能改变棋盘的大小,何落子方式,只能只能在范围内落在线与线之间的交叉点上),这个环境是不可以改变的,因此后面的Q-learning也将沿用此环境。通常不同的问题有不同环境,我们真正需要关注的是agent即算法逻辑的编写。
此处以走方格为例编写一个environment

image-20210312215227492

maze_env

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Reinforcement learning maze example.
Red rectangle: explorer.
Black rectangles: hells [reward = -1].
Yellow bin circle: paradise [reward = +1].
All other states: ground [reward = 0].
This script is the environment part of this example. The RL is in RL_brain.py.
View more on my tutorial page: https://morvanzhou.github.io/tutorials/
"""


import numpy as np
import time
import sys
if sys.version_info.major == 2:
import Tkinter as tk
else:
import tkinter as tk


UNIT = 40 # pixels
MAZE_H = 4 # grid height
MAZE_W = 4 # grid width


class Maze(tk.Tk, object):
def __init__(self):
super(Maze, self).__init__()
self.action_space = ['u', 'd', 'l', 'r']
self.n_actions = len(self.action_space)
self.title('maze')
self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))
self._build_maze()

def _build_maze(self):
self.canvas = tk.Canvas(self, bg='white',
height=MAZE_H * UNIT,
width=MAZE_W * UNIT)

# create grids
for c in range(0, MAZE_W * UNIT, UNIT):
x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
self.canvas.create_line(x0, y0, x1, y1)
for r in range(0, MAZE_H * UNIT, UNIT):
x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
self.canvas.create_line(x0, y0, x1, y1)

# create origin
origin = np.array([20, 20])

# hell
hell1_center = origin + np.array([UNIT * 2, UNIT])
self.hell1 = self.canvas.create_rectangle(
hell1_center[0] - 15, hell1_center[1] - 15,
hell1_center[0] + 15, hell1_center[1] + 15,
fill='black')
# hell
hell2_center = origin + np.array([UNIT, UNIT * 2])
self.hell2 = self.canvas.create_rectangle(
hell2_center[0] - 15, hell2_center[1] - 15,
hell2_center[0] + 15, hell2_center[1] + 15,
fill='black')

# create oval
oval_center = origin + UNIT * 2
self.oval = self.canvas.create_oval(
oval_center[0] - 15, oval_center[1] - 15,
oval_center[0] + 15, oval_center[1] + 15,
fill='yellow')

# create red rect
self.rect = self.canvas.create_rectangle(
origin[0] - 15, origin[1] - 15,
origin[0] + 15, origin[1] + 15,
fill='red')

# pack all
self.canvas.pack()

def reset(self):
self.update()
time.sleep(0.5)
self.canvas.delete(self.rect)
origin = np.array([20, 20])
self.rect = self.canvas.create_rectangle(
origin[0] - 15, origin[1] - 15,
origin[0] + 15, origin[1] + 15,
fill='red')
# return observation
return self.canvas.coords(self.rect)

def step(self, action):
s = self.canvas.coords(self.rect)
base_action = np.array([0, 0])
if action == 0: # up
if s[1] > UNIT:
base_action[1] -= UNIT
elif action == 1: # down
if s[1] < (MAZE_H - 1) * UNIT:
base_action[1] += UNIT
elif action == 2: # right
if s[0] < (MAZE_W - 1) * UNIT:
base_action[0] += UNIT
elif action == 3: # left
if s[0] > UNIT:
base_action[0] -= UNIT

self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent

s_ = self.canvas.coords(self.rect) # next state

# reward function
if s_ == self.canvas.coords(self.oval):
reward = 1
done = True
s_ = 'terminal'
elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]:
reward = -1
done = True
s_ = 'terminal'
else:
reward = 0
done = False

return s_, reward, done

def render(self):
time.sleep(0.1)
self.update()

def update():
for t in range(10):
s = env.reset()
while True:
env.render()
a = 1
s, r, done = env.step(a)
if done:
break

if __name__ == '__main__':
env = Maze()
env.after(100, update)
env.mainloop()

agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from maze_env import Maze    #即为上面的environment
import numpy as np
import pandas as pd

#RL的父类定义
class RL(object):
#初始化
#actions为可选动作, learning_rate为学习率,reward_decay为传递奖励是的递减系数gamma,1-e_greed为随机选择其他动作的概率
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
self.actions = actions
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon = e_greedy
#初始化qtable,行为observation的state, 列为当前状态可以选择的action(对于所有列,可以选择的action一样)
self.q_table = pd.DataFrame(columns = self.actions, dtype=np.float64)

def choose_action(self, observation):
self.check_state_exist(observation) #检查当前状态是否存在,不存在就添加这个状态

if np.random.uniform() < self.epsilon:
state_action = self.q_table.loc[observation, :] #找到当前状态可以选择的动作
#由于初始化或更新后一个状态下的动作值可能是相同的,为了避免每次都选择相同动作,用random.choice在值最大的action中损及选择一个
action = np.random.choice(state_action[state_action==np.max(state_action)].index)

else:
action = np.random.choice(self.actions) #0.1的几率随机选择动作
return action

def check_state_exist(self, state):
if state not in self.q_table.index:
#若找不到该obversation的转态,则添加该状态到新的qtable
#新的state的动作的q初始值赋值为0,列名为dataframe的列名,index为state
self.q_table = self.q_table.append(pd.Series([0]*len(self.actions), index=self.q_table.columns, name=state))

#不同方式的学习方法不同,用可变参数,直接pass
def learning(self, *args):
pass
class SarsaTable(RL): #继承上面的RL
#初始化
#参数自己定义,含义继承父类RL
#类方法choose_action、check_state_exist自动继承RL,参数不变
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
super(SarsaTable, self).__init__(actions, learning_rate, reward_decay, e_greedy)

def learning(self, s, a,r, s_, a_):
self.check_state_exist(s_) #检查动作后状态s_是否存在

q_old = self.q_table.loc[s, a] #旧的q[s,a]值

if s_!='terminal':
#取下个状态s_和动作a_下q值
q_predict = self.q_table.loc[s_, a_]
q_new = r+self.gamma*q_predict #计算新的值
else:
q_new = r

self.q_table.loc[s,a] = q_old - self.lr*(q_new - q_old) #根据更新公式更新,类似于梯度下降

def update():
for episode in range(100):
#初始化环境
observation = env.reset()

#根据当前状态选行为
action = RL.choose_action(str(observation))

while True:
# 刷新环境
env.render()

# 在环境中采取行为, 获得下一个 state_ (obervation_), reward, 和是否终止
observation_, reward, done = env.step(action)

#根据observation_选择observation_下应该选择的动作action_
action_ = RL.choose_action(str(observation_))

#从当前状态state,当前动作action,奖励r,执行动作后state_,state_下的action_,(s,a,r,s,a)
RL.learning(str(observation), action, reward, str(observation_), action_)

# 将下一个当成下一步的 state (observation) and action。
#与qlearning的却别是sarsa在observation_下真正执行了动作action_,供下次使用
#而qlearning中下次状态observation_时还要重新选择action_
observation = observation_
action = action_

# 终止时跳出循环
if done:
break

# 大循环完毕
print('game over')
env.destroy()


if __name__ == "__main__":
env = Maze()

#Sarsa和SarsaLambda的调用方式一模一样
#RL = SarsaTable(actions=list(range(env.n_actions)))
RL = SarsaLambdaTable(actions=list(range(env.n_actions)))

env.after(100, update)
env.mainloop()

Q-learning、

image-20210315201907446

在这里插入图片描述

image-20210315202102199

用同一个env

qlearnning-agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from maze_env import Maze
import numpy as np
import pandas as pd
import tensorflow as tf


# RL的父类定义
class RL(object):
# 初始化
# actions为可选动作, learning_rate为学习率,reward_decay为传递奖励是的递减系数gamma,1-e_greed为随机选择其他动作的概率
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
self.actions = actions
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon = e_greedy
# 初始化qtable,行为observation的state, 列为当前状态可以选择的action(对于所有列,可以选择的action一样)
self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64)

def choose_action(self, observation):
self.check_state_exist(observation) # 检查当前状态是否存在,不存在就添加这个状态

if np.random.uniform() < self.epsilon:
state_action = self.q_table.loc[observation, :] # 找到当前状态可以选择的动作
# 由于初始化或更新后一个状态下的动作值可能是相同的,为了避免每次都选择相同动作,用random.choice在值最大的action中损及选择一个
action = np.random.choice(state_action[state_action == np.max(state_action)].index)

else:
action = np.random.choice(self.actions) # 0.1的几率随机选择动作
return action

def check_state_exist(self, state):
if state not in self.q_table.index:
# 若找不到该obversation的转态,则添加该状态到新的qtable
# 新的state的动作的q初始值赋值为0,列名为dataframe的列名,index为state
self.q_table = self.q_table.append(
pd.Series([0] * len(self.actions), index=self.q_table.columns, name=state))

# 不同方式的学习方法不同,用可变参数,直接pass
def learning(self, *args):
pass


# QLearning继承RL
class QLearningTable(RL):
# 初始化
# 参数自己定义,含义继承父类RL
# 类方法choose_action、check_state_exist自动继承RL,参数不变
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
super(QLearningTable, self).__init__(actions, learning_rate, reward_decay, e_greedy)

# 根绝当前观察状态s,选择动作a,选择动作后的奖励r,和执行动作后的状态s_,来更新qtable
def learning(self, s, a, r, s_):
self.check_state_exist(s_) # 检查动作后状态s_是否存在

q_old = self.q_table.loc[s, a] # 旧的q[s,a]值

if s_ != 'terminal':
# 下个状态下最大的值
max_s_ = self.q_table.loc[s_, :].max()
q_new = r + self.gamma * max_s_ # 计算新的值
else:
q_new = r

self.q_table.loc[s, a] = q_old - self.lr * (q_new - q_old) # 根据更新公式更新,类似于梯度下降


def update():
for episode in range(100):
# 初始化 state 的观测值
observation = env.reset() # 每轮训练都要初始化观测值,即回到原点状态

while True:
env.render()

# RL 大脑根据 state 的观测值挑选 action
action = RL.choose_action(str(observation)) # qlearning采用greeed方法,选择q值最大的action

# 探索者在环境中实施这个 action, 并得到环境返回的下一个 state 观测值, reward 和 done (是否是掉下地狱或者升上天堂)
# 是根据当前选择动作,观察到的采取动作后的状态和奖励
observation_, reward, done = env.step(action)

# RL 从这个序列 (state, action, reward, state_) 中学习
# 根绝旧observation的q值,和采取动作,以及奖励和采取动作后的observation_的最大q值进行更新
RL.learning(str(observation), action, reward, str(observation_))

# 将下一个 state 的值传到下一次循环
observation = observation_

if done:
break

# 结束游戏并关闭窗口
print('game over')
env.destroy()


if __name__ == "__main__":
# 定义环境 env 和 RL 方式
env = Maze()
RL = QLearningTable(actions=list(range(env.n_actions)))
# 开始可视化环境 env
env.after(100, update)
env.mainloop()

唯一不同是on ->off

并未真正执行a’,而是选择所有a中Q值最大的一项;

DQN

image-20210315202734362

image-20210315203248027

image-20210315203406102

DQN的算法

在这里插入图片描述

image-20210315204554812

学习策略:observation

image-20210315213257698

更新方式一:更新慢,需要将所有的动作执行完才能更新

image-20210315205418821

在这里插入图片描述更新方式二

在这里插入图片描述

image-20210315211111470

image-20210315211131717

image-20210315221326359

image-20210315221342578

但上面算法中,也有一些与Q-learning不同的地方,这是使DQN变得更加有效和技巧:

image-20210315221711848

image-20210315221729557

Dobule DQN

image-20210316191813210

image-20210316193812638

image-20210316193825686

image-20210316193923094

image-20210316193946037

image-20210316194102502

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class Double_DeepQNetwork(object):
#replace_target_iter为更新target network的步数,防止target network和eval network差别过大
#memory_size为buffer储存记忆上线,方便使用以前记忆学习
def __init__(self, n_actions, n_features,learning_rate=0.01,reward_decay=0.9,e_greedy=0.9,replace_target_iter=300,memory_size=500,batch_size=32,e_greedy_increment=None,output_graph=False):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy # epsilon后面奖励对前面的递减参数
self.replace_target_iter = replace_target_iter # 更换 target_net 的步数
self.memory_size = memory_size # 记忆上限
self.batch_size = batch_size # 每次更新时从 memory 里面取多少记忆出来
self.epsilon_increment = e_greedy_increment # epsilon 的增量
#epsilon = 0等于0时,后面的奖励创传不到前面,前面的状态就开启随机探索模式
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max # 是否开启探索模式, 并逐步减少探索次数

# 记录学习次数 (用于判断是否更换 target_net 参数)
self.learn_step_counter = 0

# 初始化全 0 记忆 [s, a, r, s_], 实际上feature为状态的维度,n_features*2分别记录s和s_,+2记录a和r
self.memory = np.zeros((self.memory_size, n_features*2+2))

self._build_net()

#替换 target net 的参数
t_params = tf.get_collection('target_net_params') #提取 target_net 的参数
e_params = tf.get_collection('eval_net_params') # 提取 eval_net 的参数
#将eval_network中每一个variable的值赋值给target network的对应变量
self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)] #更新 target_net 参数


self.sess = tf.Session()

if output_graph:
tf.summary.FileWriter("logs/", self.sess.graph)

self.sess.run(tf.global_variables_initializer())
#用于记录# 记录所有 cost 变化
self.cost_his = []

#李宏毅老师克重的relpay buffer,通过以往的记忆中不断训练
#这是DQN变为off-policy的核心
def store_transition(self, s, a, r, s_):
#如果DeepQNetwork中定义了memory_counter,进行记忆存储
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
#记录一条 [s, a, r, s_] 记录
transition = np.hstack((s, [a, r], s_))

#总 memory 大小是固定的, 如果超出总大小, 旧 memory 就被新 memory 替换
index = self.memory_counter % self.memory_size #类似hashmap赋值思想
self.memory[index, :] = transition #进行替换

self.memory_counter += 1

#建立神经网络
#此处建立两个申请网络,一个为target network,用于得到q现实。一个为eval_network,用于得到q估计
#target network和eval_network结构一样,target network用比较老的参数,eval_network为真正训练的神经网络
def _build_net(self):
tf.reset_default_graph() #清空计算图

#创建eval神经网络,及时提升参数
self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # 用来接收 observation,即神经网络的输入
self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target') # q_target的值, 这个之后会通过计算得到,神经网络的输出
#eval_net域下的变量
with tf.variable_scope('eval_net'):
#c_names用于在一定步数之后更新target network
#GLOBAL_VARIABLES作用是collection默认加入所有的Variable对象,用于共享
c_names = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
n_l1 = 10 #n_l1为network隐藏层神经元的个数
w_initializer = tf.random_normal_initializer(0.,0.3)
b_initializer = tf.constant_initializer(0.1)

#eval_network第一层全连接神经网络
with tf.variable_scope('l1'):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s, w1)+b1)

#eval_network第二层全连接神经网络
with tf.variable_scope('l1'):
w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
#求出q估计值,长度为n_actions的向量
self.q_eval = tf.matmul(l1, w2) + b2

with tf.variable_scope('loss'): # 求误差
#使用平方误差
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))

with tf.variable_scope('train'): # 梯度下降
optimizer = tf.train.RMSPropOptimizer(self.lr)
self._train_op = optimizer.minimize(self.loss)

#创建target network,输入选择一个action后的状态s_,输出q_target
self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_') # 接收下个 observation
with tf.variable_scope('target_net'):
c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

# target_net 的第一层fc, collections 是在更新 target_net 参数时会用到
with tf.variable_scope('l1'):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)

# target_net 的第二层fc
with tf.variable_scope('l2'):
w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
#申请网络输出
self.q_next = tf.matmul(l1, w2) + b2

print(self.q_next)

def choose_action(self, observation):
#根据observation(state)选行为
#使用eval network选出state下的行为估计
#将observation的shape变为(1, size_of_observation),行向量变为列向量才能与NN维度统一
observation = observation[np.newaxis, :]

if np.random.uniform() < self.epsilon:
action_value = self.sess.run(self.q_eval, feed_dict={self.s:observation})
action = np.argmax(action_value)

else:
action = np.random.randint(0, self.n_actions) #随机选择

return action

def learn(self):
if self.learn_step_counter % self.replace_target_iter ==0:
self.sess.run(self.replace_target_op)
print('\ntarget_params_replaced\n')

#从memory中随机抽取batch_size这么多记忆
if self.memory_counter > self.memory_size: #说明记忆库已经存满,可以从记忆库任意位置收取
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else: #记忆库还没有存满,从现有的存储记忆提取
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)

batch_memory= self.memory[sample_index, :]

# 获取q_next即q现实(target_net产生的q)和q_eval(eval_net产生的q)
#q_next和q_eval都是一个向量,包含了对应状态下所有动作的q值
#实际上feature为状态的维度,batch_memory[:, -self.n_features:]为s_,即状态s采取动作action后的状态s_, batch_memory[:, :self.n_features]为s

# 获取q_next即q现实(target_net产生的q)和q_eval(eval_net产生的q)
#q_next和q_eval都是一个向量,包含了对应状态下所有动作的q值
#实际上feature为状态的维度,batch_memory[:, -self.n_features:]为s_,即状态s采取动作action后的状态s_, batch_memory[:, :self.n_features]为s
#q_next, q_eval的维度为[None,n_actions]
q_next, q_eval = self.sess.run([self.q_next, self.q_eval], feed_dict={self.s_: batch_memory[:, -self.n_features:],self.s: batch_memory[:, :self.n_features]})

#用t+1的状态带入eval network先选出动作
action_value = self.sess.run(self.q_eval, feed_dict={self.s: batch_memory[:, -self.n_features:]})
#维度为[batch_size, 1]
action = np.argmax(action_value, axis=1)

#下面这几步十分重要. q_next, q_eval 包含所有 action 的值, 而我们需要的只是已经选择好的 action 的值, 其他的并不需要.所以我们将其他的 action 值全变成 0, 将用到的 action 误差值 反向传递回去, 作为更新凭据.
#这是我们最终要达到的样子, 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]
# q_eval = [-1, 0, 0] 表示这一个记忆中有我选用过 action 0, 而action0带来的 Q(s, a0)=-1,而其他的 Q(s, a1)=Q(s, a2)=0
# q_target = [1, 0, 0] 表示这个记忆中的 r+gamma*maxQ(s_) = 1, 而且不管在 s_ 上我们取了哪个 action
# 我们都需要对应上 q_eval 中的 action 位置, 所以就将 q_target的1放在了 action0的位置.

# 下面也是为了达到上面说的目的, 不过为了更方面让程序运算, 达到目的的过程有点不同.# 是将 q_eval 全部赋值给 q_target, 这时 q_target-q_eval 全为 0,
# 不过 我们再根据 batch_memory 当中的 action 这个 column 来给 q_target 中的对应的 memory-action 位置来修改赋值.
# 使新的赋值为 reward + gamma * maxQ(s_), 这样 q_target-q_eval 就可以变成我们所需的样子.
q_target = q_eval.copy()
#每个样本下标
batch_index = np.arange(self.batch_size, dtype=np.int32)
#记录每个样本执行的动作
eval_act_index = batch_memory[:, self.n_features].astype(int)
#记录每个样本动作的奖励
reward = batch_memory[:, self.n_features + 1]
#生成每个样本中q值对应动作的更新,即生成的q现实,
q_target[batch_index, eval_act_index]=reward+self.gamma * q_next[batch_index, action]

#假如在这个 batch 中, 我们有2个提取的记忆, 根据每个记忆可以生产3个 action 的值:
#q_eval =[[1, 2, 3],[4, 5, 6]], 另q_target = q_eval.copy()
#然后根据 memory 当中的具体 action 位置来修改 q_target 对应 action 上的值:
#比如在:记忆 0 的 q_target 计算值是 -1, 而且我用了 action 0;忆1的 q_target 计算值是-2, 而且我用了 action 2:
#q_target =[[-1, 2, 3],[4, 5, -2]]
#所以 (q_target - q_eval) 就变成了:[[(-1)-(1), 0, 0],[0, 0, (-2)-(6)]]
#最后我们将这个 (q_target - q_eval) 当成误差, 反向传递会神经网络
#所有为 0 的 action 值是当时没有选择的 action, 之前有选择的 action 才有不为0的值.
#我们只反向传递之前选择的 action 的值,
_, self.cost = self.sess.run([self._train_op, self.loss],feed_dict={self.s: batch_memory[:, :self.n_features],self.q_target: q_target})

self.cost_his.append(self.cost) # 记录 cost 误差

#每调用一次learn,降低一次epsilon,即行为随机性
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max

self.learn_step_counter += 1

def plot_cost(self):
import matplotlib.pyplot as plt
plt.plot(np.arange(len(self.cost_his)), self.cost_his)
plt.ylabel('Cost')
plt.xlabel('training steps')
plt.show()

Double-DQN的agent编写与DQN几乎一样只是在求q估计的时候先用eval network求出价值最大的动作,再讲这个动作带入target network。
之前在DQN中使用的公式为

1
2
3
4
5
6
7
q_target[batch_index, eval_act_index]=reward+self.gamma * np.max(q_next, axis=1)
#改为:
#用t+1的状态带入eval network先选出动作
action_value = self.sess.run(self.q_eval, feed_dict={self.s: batch_memory[:, -self.n_features:]})
#维度为[batch_size, 1]
action = np.argmax(action_value, axis=1)
q_target[batch_index, eval_act_index]=reward+self.gamma * q_next[batch_index, action]

Duling DQN

image-20210316204901396

image-20210316204930844

image-20210316205031940

Dueling DQN与DQN的网络结构不同,其他过程相似。着重是更改原来DQN Agent的build_net()方法。
之前构建的方式是通过一个隐藏层直接获得q值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
with tf.variable_scope('l1'):
w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
#求出q估计值,长度为n_actions的向量
self.q_eval = tf.matmul(l1, w2) + b2

#现在将q_eval 拆分为状态价值v和动作价值a,其中动作价值a的均值为0(q_target需要做同样的改动):

#状态的价值
with tf.variable_scope('value'):
w21 = tf.get_variable('w21', [n_l1, 1], initializer=w_initializer, collections=c_names)
b21 = tf.get_variable('b21', [1], initializer=b_initializer, collections=c_names)
vs_out = tf.matmul(l1, w21) + b21

#动作的advantage
with tf.variable_scope('advantage'):
w22 = tf.get_variable('w22', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b22 = tf.get_variable('b22', [1, self.n_actions], initializer=b_initializer, collections=c_names)
aa = tf.matmul(l1, w22) + b22
#为了不让A直接学成了Q, 我们减掉了A的均值,此时A的均值始终为0
aa_out = aa - tf.reduce_mean(aa, axis=1, keep_dims=True)

#合并V和A, 求出q估计值
with tf.variable_scope('Q'):
self.q_eval = vs_out + aa_out


完整的dueling DQN的Agent的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class Dueling_DeepQNetwork(object):
#replace_target_iter为更新target network的步数,防止target network和eval network差别过大
#memory_size为buffer储存记忆上线,方便使用以前记忆学习
def __init__(self, n_actions, n_features,learning_rate=0.01,reward_decay=0.9,e_greedy=0.9,replace_target_iter=300,memory_size=500,batch_size=32,e_greedy_increment=None,output_graph=False):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy # epsilon后面奖励对前面的递减参数
self.replace_target_iter = replace_target_iter # 更换 target_net 的步数
self.memory_size = memory_size # 记忆上限
self.batch_size = batch_size # 每次更新时从 memory 里面取多少记忆出来
self.epsilon_increment = e_greedy_increment # epsilon 的增量
#epsilon = 0等于0时,后面的奖励创传不到前面,前面的状态就开启随机探索模式
self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max # 是否开启探索模式, 并逐步减少探索次数

# 记录学习次数 (用于判断是否更换 target_net 参数)
self.learn_step_counter = 0

# 初始化全 0 记忆 [s, a, r, s_], 实际上feature为状态的维度,n_features*2分别记录s和s_,+2记录a和r
self.memory = np.zeros((self.memory_size, n_features*2+2))

self._build_net()

#替换 target net 的参数
t_params = tf.get_collection('target_net_params') #提取 target_net 的参数
e_params = tf.get_collection('eval_net_params') # 提取 eval_net 的参数
#将eval_network中每一个variable的值赋值给target network的对应变量
self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)] #更新 target_net 参数


self.sess = tf.Session()

if output_graph:
tf.summary.FileWriter("logs/", self.sess.graph)

self.sess.run(tf.global_variables_initializer())
#用于记录# 记录所有 cost 变化
self.cost_his = []

#李宏毅老师克重的relpay buffer,通过以往的记忆中不断训练
#这是DQN变为off-policy的核心
def store_transition(self, s, a, r, s_):
#如果DeepQNetwork中定义了memory_counter,进行记忆存储
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0

#记录一条 [s, a, r, s_] 记录
transition = np.hstack((s, [a, r], s_))

#总 memory 大小是固定的, 如果超出总大小, 旧 memory 就被新 memory 替换
index = self.memory_counter % self.memory_size #类似hashmap赋值思想
self.memory[index, :] = transition #进行替换

self.memory_counter += 1

#建立神经网络
#此处建立两个申请网络,一个为target network,用于得到q现实。一个为eval_network,用于得到q估计
#target network和eval_network结构一样,target network用比较老的参数,eval_network为真正训练的神经网络
def _build_net(self):
tf.reset_default_graph() #清空计算图

#创建eval神经网络,及时提升参数
self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # 用来接收 observation,即神经网络的输入
self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target') # q_target的值, 这个之后会通过计算得到,神经网络的输出
#eval_net域下的变量
with tf.variable_scope('eval_net'):
#c_names用于在一定步数之后更新target network
#GLOBAL_VARIABLES作用是collection默认加入所有的Variable对象,用于共享
c_names = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
n_l1 = 10 #n_l1为network隐藏层神经元的个数
w_initializer = tf.random_normal_initializer(0.,0.3)
b_initializer = tf.constant_initializer(0.1)

#eval_network第一层全连接神经网络
with tf.variable_scope('l1'):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s, w1)+b1)

#状态的价值
with tf.variable_scope('value'):
w21 = tf.get_variable('w21', [n_l1, 1], initializer=w_initializer, collections=c_names)
b21 = tf.get_variable('b21', [1], initializer=b_initializer, collections=c_names)
vs_out = tf.matmul(l1, w21) + b21

#动作的advantage
with tf.variable_scope('advantage'):
w22 = tf.get_variable('w22', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b22 = tf.get_variable('b22', [1, self.n_actions], initializer=b_initializer, collections=c_names)
aa = tf.matmul(l1, w22) + b22
#为了不让A直接学成了Q, 我们减掉了A的均值,此时A的均值始终为0
aa_out = aa - tf.reduce_mean(aa, axis=1, keep_dims=True)

#合并V和A, 求出q估计值
with tf.variable_scope('Q'):
self.q_eval = vs_out + aa_out

with tf.variable_scope('loss'): # 求误差
#使用平方误差
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))

with tf.variable_scope('train'): # 梯度下降
optimizer = tf.train.RMSPropOptimizer(self.lr)
self._train_op = optimizer.minimize(self.loss)

#创建target network,输入选择一个action后的状态s_,输出q_target
self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_') # 接收下个 observation
with tf.variable_scope('target_net'):
c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

# target_net 的第一层fc, collections 是在更新 target_net 参数时会用到
with tf.variable_scope('l1'):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)

#状态的价值
with tf.variable_scope('value'):
w21 = tf.get_variable('w21', [n_l1, 1], initializer=w_initializer, collections=c_names)
b21 = tf.get_variable('b21', [1], initializer=b_initializer, collections=c_names)
vs_out = tf.matmul(l1, w21) + b21

#动作的advantage
with tf.variable_scope('advantage'):
w22 = tf.get_variable('w22', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
b22 = tf.get_variable('b22', [1, self.n_actions], initializer=b_initializer, collections=c_names)
aa = tf.matmul(l1, w22) + b22
#为了不让A直接学成了Q, 我们减掉了A的均值,此时A的均值始终为0
aa_out = aa - tf.reduce_mean(aa, axis=1, keep_dims=True)

#合并V和A, 求出q估计值
with tf.variable_scope('Q'):
self.q_next = vs_out + aa_out

print(self.q_next)

def choose_action(self, observation):
#根据observation(state)选行为
#使用eval network选出state下的行为估计
#将observation的shape变为(1, size_of_observation),行向量变为列向量才能与NN维度统一
observation = observation[np.newaxis, :]

if np.random.uniform() < self.epsilon:
action_value = self.sess.run(self.q_eval, feed_dict={self.s:observation})
action = np.argmax(action_value)

else:
action = np.random.randint(0, self.n_actions) #随机选择

return action

def learn(self):
if self.learn_step_counter % self.replace_target_iter ==0:
self.sess.run(self.replace_target_op)
print('\ntarget_params_replaced\n')

#从memory中随机抽取batch_size这么多记忆
if self.memory_counter > self.memory_size: #说明记忆库已经存满,可以从记忆库任意位置收取
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else: #记忆库还没有存满,从现有的存储记忆提取
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)

batch_memory= self.memory[sample_index, :]

# 获取q_next即q现实(target_net产生的q)和q_eval(eval_net产生的q)
#q_next和q_eval都是一个向量,包含了对应状态下所有动作的q值
#实际上feature为状态的维度,batch_memory[:, -self.n_features:]为s_,即状态s采取动作action后的状态s_, batch_memory[:, :self.n_features]为s

# 获取q_next即q现实(target_net产生的q)和q_eval(eval_net产生的q)
#q_next和q_eval都是一个向量,包含了对应状态下所有动作的q值
#实际上feature为状态的维度,batch_memory[:, -self.n_features:]为s_,即状态s采取动作action后的状态s_, batch_memory[:, :self.n_features]为s
#q_next, q_eval的维度为[None,n_actions]
q_next, q_eval = self.sess.run([self.q_next, self.q_eval], feed_dict={self.s_: batch_memory[:, -self.n_features:],self.s: batch_memory[:, :self.n_features]})

#下面这几步十分重要. q_next, q_eval 包含所有 action 的值, 而我们需要的只是已经选择好的 action 的值, 其他的并不需要.所以我们将其他的 action 值全变成 0, 将用到的 action 误差值 反向传递回去, 作为更新凭据.
#这是我们最终要达到的样子, 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]
# q_eval = [-1, 0, 0] 表示这一个记忆中有我选用过 action 0, 而action0带来的 Q(s, a0)=-1,而其他的 Q(s, a1)=Q(s, a2)=0
# q_target = [1, 0, 0] 表示这个记忆中的 r+gamma*maxQ(s_) = 1, 而且不管在 s_ 上我们取了哪个 action
# 我们都需要对应上 q_eval 中的 action 位置, 所以就将 q_target的1放在了 action0的位置.

# 下面也是为了达到上面说的目的, 不过为了更方面让程序运算, 达到目的的过程有点不同.# 是将 q_eval 全部赋值给 q_target, 这时 q_target-q_eval 全为 0,
# 不过 我们再根据 batch_memory 当中的 action 这个 column 来给 q_target 中的对应的 memory-action 位置来修改赋值.
# 使新的赋值为 reward + gamma * maxQ(s_), 这样 q_target-q_eval 就可以变成我们所需的样子.

q_target = q_eval.copy()
#每个样本下标
batch_index = np.arange(self.batch_size, dtype=np.int32)
#记录每个样本执行的动作
eval_act_index = batch_memory[:, self.n_features].astype(int)
#记录每个样本动作的奖励
reward = batch_memory[:, self.n_features + 1]

#生成每个样本中q值对应动作的更新,即生成的q现实,
q_target[batch_index, eval_act_index]=reward+self.gamma * np.max(q_next, axis=1)

#假如在这个 batch 中, 我们有2个提取的记忆, 根据每个记忆可以生产3个 action 的值:
#q_eval =[[1, 2, 3],[4, 5, 6]], 另q_target = q_eval.copy()
#然后根据 memory 当中的具体 action 位置来修改 q_target 对应 action 上的值:
#比如在:记忆 0 的 q_target 计算值是 -1, 而且我用了 action 0;忆1的 q_target 计算值是-2, 而且我用了 action 2:
#q_target =[[-1, 2, 3],[4, 5, -2]]
#所以 (q_target - q_eval) 就变成了:[[(-1)-(1), 0, 0],[0, 0, (-2)-(6)]]
#最后我们将这个 (q_target - q_eval) 当成误差, 反向传递会神经网络
#所有为 0 的 action 值是当时没有选择的 action, 之前有选择的 action 才有不为0的值.
#我们只反向传递之前选择的 action 的值,
_, self.cost = self.sess.run([self._train_op, self.loss],feed_dict={self.s: batch_memory[:, :self.n_features],self.q_target: q_target})

self.cost_his.append(self.cost) # 记录 cost 误差

#每调用一次learn,降低一次epsilon,即行为随机性
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max

self.learn_step_counter += 1

def plot_cost(self):
import matplotlib.pyplot as plt
plt.plot(np.arange(len(self.cost_his)), self.cost_his)
plt.ylabel('Cost')
plt.xlabel('training steps')
plt.show()

Policy Gradient

之前说的SARSA、Q-learning、DQN学习的都是在状态s下动作a的价值,属于value-based的方法。而Policy Gradient学习的是在状态s下每个动作a被选择的概率,属于policy-based的方法。

我们先说Policy Gradient的整体思想,之后将整体思想进行拆分,产生Policy Gradient每一步的流程。

Policy Gradient的网络要学习的是状态下动作输出的概率。按照常识来讲,可以获得越大的奖励的动作应该被选择的概率是越大的。需要注意的是这里所说的奖励并不是一个动作单步的奖励,而是当整个游戏结束时,这个动作整体所产生的价值,这个价值我们叫做advantage。因此我们的网络要学习的目标就是:按照每个动作的概率进行选择时,获得的奖励的期望值是最大的。

https://www.jianshu.com/p/428b640046aa

策略梯度

在PG算法中,我们的Agent又被称为Actor,Actor对于一个特定的任务,都有自己的一个策略π,策略π通常用一个神经网络表示,其参数为θ。从一个特定的状态state出发,一直到任务的结束,被称为一个完整的eposide,在每一步,我们都能获得一个奖励r,一个完整的任务所获得的最终奖励被称为R。这样,一个有T个时刻的eposide,Actor不断与环境交互,形成如下的序列τ:

image-20210316210642889

image-20210311213430270

这样一个序列τ是不确定的,因为Actor在不同state下所采取的action可能是不同的,一个序列τ发生的概率为:

image-20210311213447597

序列τ所获得的奖励为每个阶段所得到的奖励的和,称为R(τ)。因此,在Actor的策略为π的情况下,所能获得的期望奖励为:

image-20210311213514694

而我们的期望是调整Actor的策略π,使得期望奖励最大化,于是我们有了策略梯度的方法,既然我们的期望函数已经有了,我们只要使用梯度提升的方法更新我们的网络参数θ(即更新策略π)就好了,所以问题的重点变为了求参数的梯度。梯度的求解过程如下:

image-20210316210915795

首先利用log函数求导的特点进行转化,随后用N次采样的平均值来近似期望,最后,我们将pθ展开,将与θ无关的项去掉,即得到了最终的结果。

image-20210311213315228

image-20210311213339458

image-20210316211600007

image-20210316211642310

image-20210316211734692

image-20210316211747501

image-20210316211821933

policy gradient 的Agent的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from maze_env_drl import Maze
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

class PolicyGradient(object):

def __init__(self, n_actions, n_features, learning_rate=0.01, reward_decay=0.95, output_graph=False):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate # 学习率
self.gamma = reward_decay # reward 递减率

#因为需要模拟整个回合才能得到最终奖励,才能进行网络学习,所以需要将达到终点点整个序列的状态、动作、奖励记录下来
self.ep_obs, self.ep_as, self.ep_rs = [], [], [] #这是我们存储 回合信息的 list

#下面与之前是一样的
self._build_net() #建立网络
self.sess = tf.Session()

if output_graph:
tf.summary.FileWriter("logs/", self.sess.graph)

self.sess.run(tf.global_variables_initializer())

#与此前不同之前储存记忆利用的是replay buffer机制。存储内容分可能来自不同执行序列和不同参数的神经网络
#此处只是为了储存达到终点前一个系列的动作
def store_transition(self, s, a, r):
self.ep_obs.append(s)
self.ep_as.append(a)
self.ep_rs.append(r)

#构建网络,因为直接走到终点,奖励是可以观察的,不再需要target_work去求st+1的最大价值
def _build_net(self):
tf.reset_default_graph() #清空计算图
with tf.name_scope('input'):
self.tf_obs = tf.placeholder(tf.float32, [None, self.n_features], name="observations") #接收observation
#标签维度为[batch_size, 1]。标签是具体动作,不是概率
self.tf_acts = tf.placeholder(tf.int32, [None, ], name="actions_num") # 接收我们在这个回合中选过的actions
#接收每个state-action所对应的value(通过reward计算),注意此处不是单步的奖励,而是整个一些列动作的奖励,vt=本reward + 衰减的未来reward
self.tf_vt = tf.placeholder(tf.float32, [None, ], name="actions_value")

#权重初始化方式
w_initializer = tf.random_normal_initializer(0.,0.3)
b_initializer = tf.constant_initializer(0.1)
n_l1 = 10 #n_l1为network隐藏层神经元的个数
with tf.variable_scope("lay1"):
w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer)
b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer)
l1 = tf.nn.tanh(tf.matmul(self.tf_obs, w1)+b1)
with tf.variable_scope("lay2"):
w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer)
output = tf.matmul(l1, w2)+b2

#ploicy gradient求解在一个状态下每个动作的概率,因此使用softmax出动作概率
self.all_act_prob = tf.nn.softmax(output)

with tf.name_scope('loss'):
#交叉熵作为损失函数,训练样本中选中的action即为我们的label,因为还没求出最终损失,此处用reduce_sum
neg_log_prob = -tf.reduce_sum(tf.log(self.all_act_prob)*tf.one_hot(self.tf_acts, self.n_actions), axis=1)
#可是视为当前这个局部损失*对应权重,可以看到state-action的value越大,权重越大
#可以认为state-action的value越大,就越会顺着这个梯度下降,这个动作就越可信
#若不加基准线b为如下方式:
#loss = tf.reduce_mean(neg_log_prob * (self.tf_vt))

#加入基准线b的损失
loss = tf.reduce_mean(neg_log_prob * (self.tf_vt-tf.reduce_mean(self.tf_vt)))

with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(self.lr).minimize(loss)

#之前DQN根据最大动作价值选择动作,ploicy gradient根据网络输出的动作概率,依照概率选择动作
def choose_action(self, observation):
# 所有action的概率
prob_weights = self.sess.run(self.all_act_prob, feed_dict={self.tf_obs: observation[np.newaxis, :]})
#按照概率选择动作
action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel())
return action

#用于计算回合的state-action value
def _discount_and_norm_rewards(self):
#先计算单步的奖励,在逐步将后面的奖励逐步衰减加到前面
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0

#将后面的奖励逐步衰减加到前面
for t in reversed(range(len(self.ep_rs))):
running_add = running_add * self.gamma + self.ep_rs[t]
discounted_ep_rs[t] = running_add

#防止每个回合计算出的奖励量纲不同,进行正态标准化
discounted_ep_rs = np.array(discounted_ep_rs,dtype=np.float)
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)

return discounted_ep_rs

def learn(self):
#因为训练时传入的不是动作单步奖励,执行完动作后的整体奖励,因此先计算执行完动作后的整体奖励
discounted_ep_rs_norm = self._discount_and_norm_rewards()
#np.vstack(self.ep_obs)的shape=[None, n_obs],np.array(self.ep_as)的shape=[None,],discounted_ep_rs_norm的shape=[None,]
#每次的训练样本相当于一和完整流程状态和对应的动作以及对应奖励。
self.sess.run(self.train_op, feed_dict={self.tf_obs: np.vstack(self.ep_obs),self.tf_acts: np.array(self.ep_as),self.tf_vt: discounted_ep_rs_norm})
#次回合动作已经训练完,清空记忆。下次训练需要重新产生训练样本
self.ep_obs, self.ep_as, self.ep_rs = [], [], []

#返回这一回合的state-action value
return discounted_ep_rs_norm

def run_maze(RENDER):
for episode in range(3000):
# 初始化环境
observation = env.reset()
while True:
if RENDER:
# 刷新环境
env.render()
# DQN 根据观测值选择行为
action = RL.choose_action(observation)
# 环境根据行为给出下一个state, reward,是否终止
observation_, reward, done = env.step(action)
#存储记忆
RL.store_transition(observation, action, reward)
#知道获得最终奖励才能进行训练,这是MC的方法
if done:
ep_rs_sum=sum(RL.ep_rs)

if 'running_reward' not in locals():
running_reward = ep_rs_sum
else:
running_reward = running_reward * 0.99 + ep_rs_sum * 0.01

if running_reward> DISPLAY_REWARD_THRESHOLD:
RENDER=True

print("episode:", episode, " reward:", int(running_reward))

vt = RL.learn()

if episode==0:
plt.plot(vt) # plot这个回合的vt
plt.xlabel('episode steps')
plt.ylabel('normalized state-action value')
plt.show()
break #此次模拟完毕

observation = observation_ #还没有获取奖励,去要继续执行模拟

if __name__=="__main__":
RENDER=False
DISPLAY_REWARD_THRESHOLD=300
env = Maze()

RL = PolicyGradient(n_actions=env.n_actions, n_features=env.n_features, learning_rate=0.02, reward_decay=0.99, output_graph=True)
run_maze(RENDER)

image-20210317112639995

可以看出policy gradient是基于回合更新的,因此我们需要需要很大的耐心和环境产生交互样本,进行回合训练,并且由于每个回合是不稳定的,因此我们需要的大量的样本

image-20210317155054551

Q-learning学习的是每个动作的价值,要求动作必须是离散的。

policy gradient和Q-learning都有各自的优缺点。我们可以将两者整合起来,即记忆使用off-policy的学习,也可以使用连续的动作。

下面两段话摘自莫烦python的强化学习教程:Actor-Critic 的 Actor 的前生是 Policy Gradients, 这能让它毫不费力地在连续动作中选取合适的动作, 而 Q-learning 做这件事会瘫痪. 那为什么不直接用 Policy Gradients 呢? 原来 Actor Critic 中的 Critic 的前生是 Q-learning 或者其他的 以值为基础的学习法 , 能进行单步更新, 而传统的 Policy Gradients 则是回合更新, 这降低了学习效率。

image-20210317155324880

Actor 和 Critic, 他们都能用不同的神经网络来代替 . 在 Policy Gradients 的影片中提到过, 现实中的奖惩会左右 Actor 的更新情况. Policy Gradients 也是靠着这个来获取适宜的更新. 那么何时会有奖惩这种信息能不能被学习呢? 这看起来不就是 以值为基础的强化学习方法做过的事吗. 那我们就拿一个 Critic 去学习这些奖惩机制, 学习完了以后. 由 Actor 来指手画脚, 由 Critic 来告诉 Actor 你的那些指手画脚哪些指得好, 哪些指得差, Critic 通过学习环境和奖励之间的关系, 能看到现在所处状态的潜在奖励, 所以用它来指点 Actor 便能使 Actor 每一步都在更新, 如果使用单纯的 Policy Gradients, Actor 只能等到回合结束才能开始更新。

以前我们用过回合的奖励来进行policy gradient的更新,Actor-Critic将回合奖励替换成动作的价值,来对网络进行学习,自然就将Q-learning结合了起来:

image-20210317155528931

image-20210317155542474

在PG策略中,如果我们用Q函数来代替R,同时我们创建一个Critic网络来计算Q函数值,那么我们就得到了Actor-Critic方法。Actor参数的梯度变为:

image-20210311221748445

此时的Critic根据估计的Q值和实际Q值的平方误差进行更新,对Critic来说,其loss为:

image-20210311221814333

AC代码的实现地址为:https://github.com/princewen/tensorflow_practice/tree/master/RL/Basic-AC-Demo

Advantage Actor-Critic(A2C)

image-20210317155623826

image-20210317155634647

image-20210317155755636

image-20210317155831237

image-20210317155955489

image-20210317160425425

image-20210317160942244

image-20210317162607727

本文基于如下架构进行Advantage Actor-Critic的Agent实现。其中Actor和Critic的第一层权重共享:

Q-learning

Q-learning是一种value_based的方法。

image-20210312203323472