【强化学习笔记】(3) DDPG

【强化学习笔记】(3) DDPG算法是一种 on line 的深度学习算法 可以用于连续的任务控制比如控制机器人完成任务

大家好,欢迎来到IT知识分享网。

1. DDPG

Deep Deterministic Policy Gradient (DDPG) 算法是一种off-line的深度学习算法,可以用于连续的任务控制比如控制机器人完成任务。

1.1 DDPG基本原理

Q-learning(数据过量,引入神经网络)–> DQN (无法拓展连续动作,引入策略网络) –> AC结构 (保留经验回放和目标网络)

DDPGActor-Critic (AC) 结构的基础上结合了确定性策略和经验回放的思想,用于解决连续动作空间问题。

DDPG算法的主要的特点和步骤:

  1. Actor-Critic架构:DDPG算法基于AC框架,其中Actor负责学习确定性策略,即在给定状态下直接输出动作值; Critic负责学习值函数,即输出的是对当前状态-动作对的评价,相当于输出一个q值。
  2. 确定性策略:与传统的策略梯度方法不同, DDPG使用确定性策略,直接输出动作值而不是动作的概率分布。有助于在连续动作空间中更好地学习策略。
  3. 经验回放:解决样本相关性和稳定性问题,DDPG引入经验回放机制,将智能体与环境交互的经验存储在replay buffer中,训练时随机采样。
  4. 目标网络:DDPG有四个网络,其中包含目标的Actor和Critic网络,用于估计目标Q值和策略。目标网络的参数通过软更新的方法从主网络中得到。
  5. 噪声探索:确定性策略输出的动作为确定动作,缺乏对环境的探索,因此需要在训练阶段,给Actor网络输出的动作加入噪声,从而让智能体续具备一定的探索能力。

为什么引入目标网络?

在深度强化学习中,引入目标网络是为了解决训练过程中的不稳定性和提高算法的收敛性。具体来说,引入目标网络主要有以下两个作用:

稳定训练:在训练深度强化学习模型时,目标网络的引入可以减少训练过程中的“moving target”问题。在训练Q网络或者Actor网络时,如果每次更新都直接影响到当前的网络参数,会导致目标值的变化,从而使得训练不稳定。通过引入目标网络,可以固定目标网络的参数一段时间,使得目标值更加稳定,有利于训练的收敛。

减少估计误差:在深度强化学习中,通常会使用TD目标来更新Q值或者Actor策略。而直接使用当前的网络来估计TD目标可能会引入较大的估计误差,导致训练不稳定。通过引入目标网络,可以使用目标网络来估计TD目标,减少估计误差,从而提高算法的稳定性和收敛性。

1.2 伪代码

【强化学习笔记】(3) DDPG

DDPG中一共有四个网络: ACTOR, CRITIC, TARGET ACTOR, TARGET CRITIC

1. 训练时每个timestep都需要对四个网络进行更新,通过TD error对critic网络进行更新:

 y_i = r_i+\gamma Q^{\prime}\left(s_{i+1}, \mu^{\prime}\left(s_{i+1} \mid \theta^{\mu^{\prime}}\right) \mid \theta^{Q^{\prime}}\right)  

 L=\frac{1}{N} \sum_i\left(y_i-Q\left(s_i, a_i \mid \theta^Q\right)\right)^2

可以看作critic网络更新就和DQN更新类似,需要Q网络和target-Q网络输出的q-value尽可能的接近。target-q通过预测的下一步的state对应的q值*gamma+r得到预测的当前state对应的q-value。

特别注意:q网络的输入是(s, a)输出是q-值

2. actor网络通过策略梯度进行更新,希望当前的state作为输入给策略输出的action所构成的状态动作对所对应的q值(由critic网络得到)是最大的,即-q越小越好:

\left.\left.\nabla_{\theta^\mu} J \approx \frac{1}{N} \sum_i \nabla_a Q\left(s, a \mid \theta^Q\right)\right|_{s=s_i, a=\mu\left(s_i\right)} \nabla_{\theta^\mu} \mu\left(s \mid \theta^\mu\right)\right|_{s_i}

3. target网络则通过软更新进行更新

1.3 代码说明

1.3.1 创建网络

搭建Actor-Critic网络框架:

class Actor(nn.Module): def __init__(self, num_observation, num_actions, n_dim=[128, 128]): super(Actor, self).__init__() layers = [] layer_shape = [num_observation] + list(n_dim) + [num_actions] activation = nn.ReLU for j in range(len(layer_shape) - 2): layers += [nn.Linear(layer_shape[j], layer_shape[j+1]), activation()] layers += [nn.Linear(layer_shape[len(layer_shape)-2], layer_shape[len(layer_shape)-1]), nn.Tanh()] self.net = nn.Sequential(*layers) def forward(self, x): x = self.net(x) return x class Critic(nn.Module): def __init__(self, num_observation, num_actions, n_dim=[128, 128]): super(Critic, self).__init__() layers = [] layer_shape = [num_observation+num_actions] + list(n_dim) + [1] activation = nn.ReLU for j in range(len(layer_shape) - 1): layers += [nn.Linear(layer_shape[j], layer_shape[j+1]), activation] self.net = nn.Sequential(*layers) def forward(self, x, a): cat = torch.cat([x, a], dim=1) x = self.net(cat) return x 

 初始化网络

 self.Actor_eval = Actor(self.num_observations, self.num_actions) self.Actor_target = Actor(self.num_observations, self.num_actions) self.Actor_target.load_state_dict(self.Actor_eval.load_state_dict()) self.Critic_eval = Critic(self.num_observations, self.num_actions) self.Critic_target = Critic(self.num_observations, self.num_actions) self.Critic_target.load_state_dict(self.Critic_eval.load_state_dict())

1.3.2 噪声探索

噪声探索,首先选择action并且对action加入噪声,实现exploration noise

 a = self.select_action(s) a = np.clip(np.random.normal(a, var), -2, 2)

1.3.3 更新网络

Actor:更新策略网络,通过当前的state得到策略网络输出的动作,将\left ( s, \mu(s) \right )输入给critic网络得到q值作为loss进行梯度更新

 # update Actor # loss = -q_new = -Q(s, mu(s)) mu_s = self.Actor_eval(s) q_new = self.Critic_eval(s, mu_s) # if a is a correct action, Q->0, policy gradient loss_action = -torch.mean(q_new) self.optimizer_a.zero_grad() loss_action.backward() self.optimizer_a.step()

Critic: 更新Q网络,与DQN的更新方式类似

  1. 目标策略网络(target actor)预测s_{t+1}对应的a_{t+1},将\left ( s_{t+1}, a_{t+1} \right )输入到目标Q网络(target critic)之后得到当前state-action对应的目标q值,并通过贝尔曼方程由t+1时刻的q值计算t时刻的q值,作为q-target
  2. 由replay buffer中的动作状态计算当前这一时刻的q值,由critic网络得到,作为q-value
  3. 通过TD error计算loss, loss = (q_target – q_value)^2
 # update Critic, similar to DQN, utilizing replay buffer # q_target = y = r + gamma * Q_(s',a') a_ = self.Actor_target(s_) # predict the action of q_target q_ = self.Critic_target(s_, a_) q_target = r + self.gamma*q_ # q_v = a = Q(s, a) q_v = self.Critic_eval(s, a) # td_error = (y - a)^2 = (q_target - q)^2 let q -> q_target td_error = self.loss_td(q_target, q_v) self.optimizer_c.zero_grad() td_error.backward() self.optimizer_c.step()

target critic, target actor: 

 # update the target network (soft update) # theta(q') <- tau*theta(q) + (1-tau)*theta(q') for critic # theta(mu') <- tau*theta(mu) + (1-tau)*theta(mu') for actor for x in self.Actor_target.state_dict().keys(): eval('self.Actor_target.' + x + '.data.mul_((1-self.tau))') eval('self.Actor_target.' + x + '.data.add_(self.tau*self.Actor_eval.' + x + '.data)') for x in self.Critic_target.state_dict().keys(): eval('self.Critic_target.' + x + '.data.mul_((1-self.tau))') eval('self.Critic_target.' + x + '.data.add_(self.tau*self.Critic_eval.' + x + '.data)')

1.4 完整代码

import torch import numpy as np import torch.nn as nn import torch.nn.functional as F import time import gym MAX_EPISODES = 200 MAX_EP_STEPS = 200 LR_A = 0.001 # learning rate for actor LR_C = 0.002 # learning rate for critic GAMMA = 0.9 # reward discount TAU = 0.01 # soft replacement MEMORY_CAPACITY = 10000 BATCH_SIZE = 32 RENDER = False ENV_NAME = 'Pendulum-v0' class DDPG: def __init__(self, env, num_actions, num_observations, action_bound, gamma=0.99, learning_rate_a=1e-3, learning_rate_c=0.002, batch_size=128, memory_capacity=10000, tau=0.01, render=False, device='cpu'): self.env = env self.render = render self.device = device self.num_actions = num_actions self.num_observations = num_observations self.action_bound = action_bound self.gamma = gamma self.lr_a = learning_rate_a self.lr_c = learning_rate_c self.tau = tau self.batch_size = batch_size self.memory_capacity = memory_capacity self.ep_reward = 0 self.memory = np.zeros((self.memory_capacity, self.num_observations*2 + num_actions + 1), dtype=np.float32) self.pointer = 0 self.Actor_eval = Actor(self.num_observations, self.num_actions) self.Actor_target = Actor(self.num_observations, self.num_actions) self.Actor_target.load_state_dict(self.Actor_eval.load_state_dict()) self.Critic_eval = Critic(self.num_observations, self.num_actions) self.Critic_target = Critic(self.num_observations, self.num_actions) self.Critic_target.load_state_dict(self.Critic_eval.load_state_dict()) self.optimizer_a = torch.optim.Adam(self.Actor_eval.parameters(), lr=self.lr_a) self.optimizer_c = torch.optim.Adam(self.Critic_eval.parameters(), lr=self.lr_c) self.loss_td = nn.MSELoss() def select_action(self, s): s = torch.unsqueeze(torch.FloatTensor(s), 0) return self.Actor_eval(s)[0].detach() def store_transition(self, s, a, r, s_): transition = np.hstack((s, a, [r], s_)) index = self.pointer % MEMORY_CAPACITY self.memory[index, :] = transition self.pointer += 1 def optimal_(self): # update the target network (soft update) # theta(q') <- tau*theta(q) + (1-tau)*theta(q') for critic # theta(mu') <- tau*theta(mu) + (1-tau)*theta(mu') for actor for x in self.Actor_target.state_dict().keys(): eval('self.Actor_target.' + x + '.data.mul_((1-self.tau))') eval('self.Actor_target.' + x + '.data.add_(self.tau*self.Actor_eval.' + x + '.data)') for x in self.Critic_target.state_dict().keys(): eval('self.Critic_target.' + x + '.data.mul_((1-self.tau))') eval('self.Critic_target.' + x + '.data.add_(self.tau*self.Critic_eval.' + x + '.data)') indices = np.random.choice(self.memory_capacity, size=self.batch_size) transition = self.memory[indices, :] s = torch.FloatTensor(transition[:, :self.num_observations]) a = torch.FloatTensor(transition[:, self.num_observations: self.num_observations+self.num_actions]) r = torch.FloatTensor(transition[:, -self.num_observations-1: -self.num_observations]) s_ = torch.FloatTensor(transition[:, -self.num_observations:]) # update Actor # loss = -q_new = -Q(s, mu(s)) mu_s = self.Actor_eval(s) q_new = self.Critic_eval(s, mu_s) # if a is a correct action, Q->0, policy gradient loss_action = -torch.mean(q_new) self.optimizer_a.zero_grad() loss_action.backward() self.optimizer_a.step() # update Critic, similar to DQN, utilizing replay buffer # q_target = y = r + gamma * Q_(s',a') a_ = self.Actor_target(s_) # predict the action of q_target q_ = self.Critic_target(s_, a_) q_target = r + self.gamma*q_ # q_v = a = Q(s, a) q_v = self.Critic_eval(s, a) # td_error = (y - a)^2 = (q_target - q)^2 let q -> q_target td_error = self.loss_td(q_target, q_v) self.optimizer_c.zero_grad() td_error.backward() self.optimizer_c.step() def learn(self, num_episodes): """learning process interact with env and update the network""" var = 3 # control exploration for i_episode in range(num_episodes): # Initialize the environment and get its state s, info = self.env.reset() s = torch.tensor(s, dtype=torch.float32, device=self.device).unsqueeze(0) for j in range(200): if self.render: self.env.render() # add exploration noise # normal distribution with mean 'a' and standard deviation 'var' a = self.select_action(s) a = np.clip(np.random.normal(a, var), -2, 2) s_, r, done, info = self.env.step(a.item()) if done: s_ = None else: s_ = torch.tensor(s_, dtype=torch.float32, device=self.device).unsqueeze(0) self.store_transition(s, a, r/10, s_) if self.pointer > self.memory_capacity: var *= .9995 self.optimal_() s = s_ self.ep_reward += r if j == MAX_EP_STEPS-1: print('Episode:', i_episode, ' Reward: %i' % int(self.ep_reward), 'Explore: %.2f' % var, ) if self.ep_reward > -300: self.render = True break class Actor(nn.Module): def __init__(self, num_observation, num_actions, n_dim=[128, 128]): super(Actor, self).__init__() layers = [] layer_shape = [num_observation] + list(n_dim) + [num_actions] activation = nn.ReLU for j in range(len(layer_shape) - 2): layers += [nn.Linear(layer_shape[j], layer_shape[j+1]), activation()] layers += [nn.Linear(layer_shape[len(layer_shape)-2], layer_shape[len(layer_shape)-1]), nn.Tanh()] self.net = nn.Sequential(*layers) def forward(self, x): x = self.net(x) return x class Critic(nn.Module): def __init__(self, num_observation, num_actions, n_dim=[128, 128]): super(Critic, self).__init__() layers = [] layer_shape = [num_observation+num_actions] + list(n_dim) + [1] activation = nn.ReLU for j in range(len(layer_shape) - 1): layers += [nn.Linear(layer_shape[j], layer_shape[j+1]), activation] self.net = nn.Sequential(*layers) def forward(self, x, a): cat = torch.cat([x, a], dim=1) x = self.net(cat) return x 

参考链接

【1】强化学习——DDPG算法(附pytorch代码)-CSDN博客

【2】论文 https://arxiv.org/pdf/1509.02971.pdf

【3】深度强化学习笔记——DDPG原理及实现(pytorch)_ddpg算法原理-CSDN博客

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/122889.html

(0)
上一篇 2025-10-13 11:33
下一篇 2025-10-13 12:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信