使用火炬时,蛇游戏正在绕圈移动并且不收集食物/奖励。我该如何解决这个问题?

问题描述 投票:0回答:1

所以我用 pygame 制作了一个蛇游戏,蛇从中间开始,收集食物后变得更大。

import pygame
import random
from enum import Enum
from collections import namedtuple
import numpy as np

pygame.init()
font = pygame.font.Font(None, 36)

class Direction(Enum):
    RIGHT = 1
    LEFT = 2
    UP = 3
    DOWN = 4

Point = namedtuple('Point', 'x y')

# RGB colors
WHITE = (255, 255, 255)
RED = (200, 0, 0)
BLUE1 = (0, 0, 255)
BLUE2 = (0, 100, 255)
BLACK = (0, 0, 0)

BLOCK_SIZE = 20
SPEED = 10

class SnakeGameAI:
    def __init__(self, w=640, h=480):
        self.w = w
        self.h = h
        self.reset()

    def reset(self):
        self.direction = Direction.RIGHT
        self.head = Point(self.w // 2, self.h // 2)
        self.snake = [self.head,
                      Point(self.head.x - BLOCK_SIZE, self.head.y),
                      Point(self.head.x - (2 * BLOCK_SIZE), self.head.y)]
        self.score = 0
        self.food = None
        self._place_food()
        self.frame_iteration = 0
        return self.get_state()

    def _place_food(self):
        while True:
            x = random.randint(0, (self.w - BLOCK_SIZE) // BLOCK_SIZE) * BLOCK_SIZE
            y = random.randint(0, (self.h - BLOCK_SIZE) // BLOCK_SIZE) * BLOCK_SIZE
            self.food = Point(x, y)
            if self.food not in self.snake:
                break

    def play_step(self, action):
        self.frame_iteration += 1
        self._move(action)
        self.snake.insert(0, self.head)

        reward = 0
        done = False
        if self.is_collision() or self.frame_iteration > 100 * len(self.snake):
            done = True
            reward = -10
            return self.get_state(), reward, done

        if self.head == self.food:
            self.score += 1
            reward = 10
            self._place_food()
        else:
            self.snake.pop()

        return self.get_state(), reward, done

    def is_collision(self, pt=None):
        if pt is None:
            pt = self.head
        if pt.x >= self.w or pt.x < 0 or pt.y >= self.h or pt.y < 0:
            return True
        if pt in self.snake[1:]:
            return True
        return False

    def _move(self, action):
        clock_wise = [Direction.RIGHT, Direction.DOWN, Direction.LEFT, Direction.UP]
        idx = clock_wise.index(self.direction)

        if np.array_equal(action, [1, 0, 0]):  # Move straight
            new_dir = clock_wise[idx]
        elif np.array_equal(action, [0, 1, 0]):  # Turn right
            new_dir = clock_wise[(idx + 1) % 4]
        else:  # Turn left
            new_dir = clock_wise[(idx - 1) % 4]

        # Prevent the snake from reversing
        if (self.direction == Direction.RIGHT and new_dir == Direction.LEFT) or \
           (self.direction == Direction.LEFT and new_dir == Direction.RIGHT) or \
           (self.direction == Direction.UP and new_dir == Direction.DOWN) or \
           (self.direction == Direction.DOWN and new_dir == Direction.UP):
            new_dir = self.direction
        
        self.direction = new_dir

        x = self.head.x
        y = self.head.y
        if self.direction == Direction.RIGHT:
            x += BLOCK_SIZE
        elif self.direction == Direction.LEFT:
            x -= BLOCK_SIZE
        elif self.direction == Direction.DOWN:
            y += BLOCK_SIZE
        elif self.direction == Direction.UP:
            y -= BLOCK_SIZE

        self.head = Point(x, y)

    def get_state(self):
        head_x, head_y = self.head.x, self.head.y
        food_x, food_y = self.food.x, self.food.y
        direction = self.direction.value
        # Features: normalized distances and direction
        return np.array([
            (food_x - head_x) / self.w,
            (food_y - head_y) / self.h,
            direction / 4,
            len(self.snake) / ((self.w // BLOCK_SIZE) * (self.h // BLOCK_SIZE))
        ])

然后使用pytorch和cuda,我尝试使用q_learning训练AI玩:

import torch
import torch.nn as nn
import torch.optim as optim
import os
import numpy as np

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Linear_QNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = torch.relu(self.linear1(x))
        x = self.linear2(x)
        return x

    def save(self, file_name='model.pth'):
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)
        file_name = os.path.join(model_folder_path, file_name)
        torch.save(self.state_dict(), file_name)

class QTrainer:
    def __init__(self, model, lr, gamma, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon_start  # Initial exploration rate
        self.epsilon_end = epsilon_end  # Minimum exploration rate
        self.epsilon_decay = epsilon_decay  # Decay factor for epsilon
        self.model = model.to(device)  # Ensure model is on the correct device
        self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()

    def get_action(self, state):
        if np.random.rand() < self.epsilon:
            # Explore: select a random action
            return np.random.randint(0, 3)  # Assuming 3 possible actions
        else:
            # Exploit: select the best action based on the model
            state = torch.tensor(state, dtype=torch.float).to(device)
            with torch.no_grad():
                q_values = self.model(state.unsqueeze(0))  # Add batch dimension
                return torch.argmax(q_values).item()

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float).to(device)
        next_state = torch.tensor(next_state, dtype=torch.float).to(device)
        action = torch.tensor(action, dtype=torch.long).to(device)
        reward = torch.tensor(reward, dtype=torch.float).to(device)
        done = torch.tensor(done, dtype=torch.float).to(device)

        if len(state.shape) == 1:
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done, )

        pred = self.model(state)
        target = pred.clone()

        for idx in range(len(done)):
            Q_new = reward[idx]
            if not done[idx]:
                Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))

            target[idx][torch.argmax(action[idx]).item()] = Q_new

        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()
        self.optimizer.step()

        # Update epsilon (exploration rate)
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)

最后,我想运行并行环境,以便模型训练得更快,而不需要花费几个小时来学习如何玩。这是main.py

import pygame
import numpy as np
import torch
from snake_game import SnakeGameAI
from q_learning import Linear_QNet, QTrainer

# Hyperparameters
INPUT_SIZE = 4
HIDDEN_SIZE = 256
OUTPUT_SIZE = 3
LEARNING_RATE = 0.001
GAMMA = 0.9
NUM_EPISODES = 1000
NUM_ENVS = 1  # Adjust this for parallel environments if needed

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (200, 0, 0)
BLUE1 = (0, 0, 255)
BLUE2 = (0, 100, 255)
BLOCK_SIZE = 20

class ParallelEnvManager:
    def __init__(self, env_class, num_envs):
        self.envs = [env_class() for _ in range(num_envs)]

    def reset(self):
        return np.array([env.reset() for env in self.envs])

    def step(self, actions):
        next_state, rewards, done = [], [], []
        for env, action in zip(self.envs, actions):
            state, reward, d = env.play_step(action)
            next_state.append(state)
            rewards.append(reward)
            done.append(d)
        return np.array(next_state), np.array(rewards), np.array(done)

    def close(self):
        pygame.quit()

def main():
    pygame.init()
    width, height = 640, 480
    screen = pygame.display.set_mode((width, height))
    pygame.display.set_caption('Snake Q-Learning')
    clock = pygame.time.Clock()

    model = Linear_QNet(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE).to(device)
    trainer = QTrainer(model, LEARNING_RATE, GAMMA)
    env_manager = ParallelEnvManager(SnakeGameAI, NUM_ENVS)

    font = pygame.font.Font(None, 36)
    running = True
    episode = 0

    while running and episode < NUM_EPISODES:
        state = env_manager.reset()
        done = np.zeros(NUM_ENVS, dtype=bool)
        while not np.all(done):
            actions = [trainer.get_action(s) for s in state]
            next_state, rewards, dones = env_manager.step(actions)

            trainer.train_step(state, actions, rewards, next_state, dones)
            state = next_state

            # Update display
            screen.fill(BLACK)
            for env in env_manager.envs:
                # Draw snake
                for pt in env.snake:
                    pygame.draw.rect(screen, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE))
                    pygame.draw.rect(screen, BLUE2, pygame.Rect(pt.x + 4, pt.y + 4, 12, 12))
                # Draw food
                pygame.draw.rect(screen, RED, pygame.Rect(env.food.x, env.food.y, BLOCK_SIZE, BLOCK_SIZE))

            score_text = font.render(f"Episode: {episode} | Score: {env_manager.envs[0].score}", True, WHITE)
            screen.blit(score_text, (10, 10))
            pygame.display.flip()

            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False
                    break

        episode += 1
        pygame.display.set_caption(f'Snake Q-Learning - Episode {episode}')

    env_manager.close()
    model.save('snake_model.pth')
    pygame.quit()

if __name__ == '__main__':
    main()

到目前为止,我已经尝试改变 q_learning,认为人工智能可能需要探索更多。我删除了 pycache,以便机器学习将基于新的 q_learning 算法重新开始。如果 cuda 不可用,我更新了 main.py 中的 torch.device 以使用 cpu。

我是机器学习/AI/pytorch的新手,发现了一个蛇AI游戏的视频。我想从头开始制作这个游戏,并通过制作自定义并行环境管理器类来允许并行环境,并拥有一个 GUI,这样我就可以看到 AI 玩游戏并学习。我认为这将是一个很好的启动程序,因此我可以根据我在 GUI 上看到的内容调整我的代码。

经过几个小时的调整代码,我不明白为什么人工智能只是绕圈移动,并且不想探索地图,甚至不想找到墙壁。

python python-3.x numpy pytorch pygame
1个回答
0
投票

奖励系统不平衡,在它开始改善之前,我建议你将死亡奖励设置为-300,每一步什么都不做-1,每个苹果+100。

这应该会促使您至少在开始时采取更多行动,然后当情况开始改善时,您可以在每一步中删除负面词。

def play_step(self, action):
        self.frame_iteration += 1
        self._move(action)
        self.snake.insert(0, self.head)

        reward = 0
        done = False
        if self.is_collision() or self.frame_iteration > 100 * len(self.snake):
            done = True
            reward = -300
            return self.get_state(), reward, done

        if self.head == self.food:
            self.score += 1
            reward = 100
            self._place_food()
        else:
            reward = -1
            self.snake.pop()

        return self.get_state(), reward, done
© www.soinside.com 2019 - 2024. All rights reserved.