我正在尝试训练模型,选择 10 个数字列表中的最大数字。
例如,我有一个列表 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]。
给定该输入,模型应该找到最大的数字,在这种情况下为 9。
我有以下奖励/惩罚规则:
我让我的模型玩那个游戏,准确率始终在 10% 左右,这与仅使用完全随机挑选的结果相同。
我做错了什么?
这是我的代码:
import random
import torch
import torch.nn as nn
from tensordict import TensorDict
from tensordict.nn import TensorDictModule, InteractionType
from torch import optim
from torch.distributions import Categorical
from torchrl.modules import ProbabilisticActor, ValueOperator
from torchrl.objectives import ClipPPOLoss
from torchrl.objectives.value.functional import generalized_advantage_estimate
def main():
seed = 3
torch.manual_seed(seed)
# Find the maximum number in the list of 10 numbers.
policy_network = nn.Sequential(
nn.Linear(10, 64),
nn.ReLU(),
nn.Linear(64, 10)
)
policy_module = TensorDictModule(
module=policy_network,
in_keys=["numbers"],
out_keys=["logits"]
)
actor = ProbabilisticActor(
module=policy_module,
in_keys=["logits"],
out_keys=["action"],
distribution_class=Categorical,
default_interaction_type=InteractionType.RANDOM,
return_log_prob=True
)
value_network = nn.Sequential(
nn.Linear(10, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
value_operator = ValueOperator(
module=value_network,
in_keys=["numbers"],
out_keys=["value"]
)
loss_module = ClipPPOLoss(
actor_network=actor,
critic_network=value_operator
)
loss_module.set_keys(
advantage="advantage",
value_target="value_target",
value="value",
action="action",
sample_log_prob="sample_log_prob"
)
# Training
episode = 0
max_episode = 1000
optimizer = optim.Adam(list(policy_network.parameters()) + list(value_network.parameters()))
number_of_correct_decisions = 0
while episode < max_episode:
# Generate a list of 10 random integers.
numbers = [random.randint(1, 100) for _ in range(10)]
current_tensor_dict = TensorDict({
"numbers": torch.FloatTensor(numbers)
}, batch_size=[])
actor(current_tensor_dict)
max_index = current_tensor_dict["action"].item()
value_operator(current_tensor_dict)
current_tensor_dict["sample_log_prob"] = current_tensor_dict["sample_log_prob"].detach()
next_tensor_dict = TensorDict({
"numbers": torch.FloatTensor(numbers)
}, batch_size=[])
value_operator(next_tensor_dict)
correct_index = numbers.index(max(numbers))
# Reward/Penalty Rules
score = 0
if max_index == correct_index:
score += 10
number_of_correct_decisions += 1
else:
score -= 1
reward = torch.FloatTensor([[score]])
# Note that we need to use batched input and the output will be in batched form.
advantage, value_target = generalized_advantage_estimate(
gamma=0.98,
lmbda=0.95,
state_value=current_tensor_dict["value"].unsqueeze(0),
next_state_value=next_tensor_dict["value"].unsqueeze(0),
reward=reward,
done=torch.BoolTensor([[1]]),
terminated=torch.BoolTensor([[1]])
)
current_tensor_dict["advantage"] = advantage.squeeze(0)
current_tensor_dict["value_target"] = value_target.squeeze(0)
loss_tensor_dict = loss_module(current_tensor_dict)
loss_critic = loss_tensor_dict["loss_critic"]
loss_entropy = loss_tensor_dict["loss_entropy"]
loss_objective = loss_tensor_dict["loss_objective"]
loss = loss_critic + 0.01 * loss_entropy + loss_objective
print(
f"episode: {episode}, score: {score}, numbers: {numbers}, max_num: {numbers[max_index]}")
loss.backward()
optimizer.step()
optimizer.zero_grad()
episode += 1
print(f"Accuracy = {number_of_correct_decisions / max_episode}")
main()
事实证明,智能体在优化之前没有进行足够的探索。
我误解了 PPO 的概念,认为负奖励会降低选择该操作的概率。
但是,我意识到奖励的绝对大小并不重要。
重要的是奖励之间的相对差异。
例如,反复收到-1奖励只会让智能体认为当前的行动是最优的。
换句话说,在这种情况下优化模型只会鼓励代理采取该行动。
智能体只有在经历不同的奖励后才会改变动作。
所以,我了解到在优化之前让代理体验各种奖励是必不可少的。
关于代码,我更改了以下内容:
loss_entropy
中的 ClipPPOLoss
,而不将其乘以 0.01。动机是增加动作的随机性并鼓励探索。这是更新后的代码:
import torch
import torch.nn as nn
from tensordict import TensorDict
from tensordict.nn import TensorDictModule, InteractionType
from torch import optim
from torch.distributions import Categorical
from torchrl.modules import ProbabilisticActor, ValueOperator
from torchrl.objectives import ClipPPOLoss
from torchrl.objectives.value.functional import generalized_advantage_estimate
def main():
seed = 3
torch.manual_seed(seed)
# Find the maximum number in the list of 10 numbers.
policy_network = nn.Sequential(
nn.Linear(10, 64),
nn.ReLU(),
nn.Linear(64, 10)
)
policy_module = TensorDictModule(
module=policy_network,
in_keys=["numbers"],
out_keys=["logits"]
)
actor = ProbabilisticActor(
module=policy_module,
in_keys=["logits"],
out_keys=["action"],
distribution_class=Categorical,
default_interaction_type=InteractionType.RANDOM,
return_log_prob=True
)
value_network = nn.Sequential(
nn.Linear(10, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
value_operator = ValueOperator(
module=value_network,
in_keys=["numbers"],
out_keys=["value"]
)
loss_module = ClipPPOLoss(
actor_network=actor,
critic_network=value_operator
)
loss_module.set_keys(
advantage="advantage",
value_target="value_target",
value="value",
action="action",
sample_log_prob="sample_log_prob"
)
# Training
epoch = 0
max_epoch = 5000
optimizer = optim.Adam(list(policy_network.parameters()) + list(value_network.parameters()))
number_of_correct_decisions = 0
total_loss = 0
numbers = [3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
while epoch < max_epoch:
current_tensor_dict = TensorDict({
"numbers": torch.FloatTensor(numbers)
}, batch_size=[])
actor(current_tensor_dict)
max_index = current_tensor_dict["action"].item()
value_operator(current_tensor_dict)
current_tensor_dict["sample_log_prob"] = current_tensor_dict["sample_log_prob"].detach()
next_tensor_dict = TensorDict({
"numbers": torch.FloatTensor(numbers)
}, batch_size=[])
value_operator(next_tensor_dict)
correct_index = numbers.index(max(numbers))
# Reward/Penalty Rules
score = 0
if max_index == correct_index:
score += 10
number_of_correct_decisions += 1
else:
score -= 1
reward = torch.FloatTensor([[score]])
# Note that we need to use batched input, and the output will be in batched form.
advantage, value_target = generalized_advantage_estimate(
gamma=0.98,
lmbda=0.95,
state_value=current_tensor_dict["value"].unsqueeze(0),
next_state_value=next_tensor_dict["value"].unsqueeze(0),
reward=reward,
done=torch.BoolTensor([[1]]),
terminated=torch.BoolTensor([[1]])
)
current_tensor_dict["advantage"] = advantage.squeeze(0)
current_tensor_dict["value_target"] = value_target.squeeze(0)
loss_tensor_dict = loss_module(current_tensor_dict)
loss_critic = loss_tensor_dict["loss_critic"]
loss_entropy = loss_tensor_dict["loss_entropy"]
loss_objective = loss_tensor_dict["loss_objective"]
loss = loss_critic + loss_entropy + loss_objective
total_loss += loss
print(f"episode: {epoch}, score: {score}, max_num: {numbers[max_index]}")
# It's important to let the agent explore the environment enough so that it can experience the rewards.
# That's why we optimize the models once every 100 epochs.
if epoch % 100 == 0:
total_loss.backward()
optimizer.step()
optimizer.zero_grad()
total_loss = 0
epoch += 1
print(f"Accuracy = {number_of_correct_decisions / max_epoch}")
main()