LSTM 预测功能故障排除:我做错了什么?

问题描述 投票:0回答:1

我的 LSTM 有三个输入 (x,y,z)。我的 LSTM 模型用于预测 z 的下一个时间步长。我有 9 个时间步长的回顾期。然后我需要使用递归函数预测 z 的下一个时间步长。但是,当我绘制预测 z 值时,我得到了不好的结果。数据来自我无法共享的 csv 文件。

这是我的代码:

data = np.column_stack((x,y, z))
def df_to_X_y(data, window_size=9):
    X = [ ]
    y = [ ] 
    for i in range(len(data) - window_size):  
        row = data[i:i + window_size]         
        X.append(row)                         
        label = data[i + window_size, 0]      
        y.append(label)                       
    return np.array(X), np.array(y)

X, y = df_to_X_y(data)

split_ratio = 0.8
split_idx = int(len(X) * split_ratio)  

X_train = X[:split_idx]  
X_test = X[split_idx:]   

y_train = y[:split_idx] 
y_test = y[split_idx:]    

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, target_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, target_size)  # Output layer for regression

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        # lstm_out, (hn, cn) = self.lstm(x)  
        out = self.fc(lstm_out[:, -1, :])  
        return out
    
    def forecast(self, initial_input, num_steps):
        predictions = []  # Store forecasted TMP values
        current_input = initial_input.clone()  # Clone to avoid modifying the original input

        for _ in range(num_steps):
            next_output = self.forward(current_input)  
            predictions.append(next_output.unsqueeze(1))  # Add time dimension
            
            next_input = current_input.clone()  # Clone the current input
            next_input[:, :-1, :] = current_input[:, 1:, :]  # Shift the window by 1 step
            next_input[:, -1, 0] = next_output.squeeze(1)  # Update z with the prediction
            # Leave x and y unchanged in next_input[:, -1, 1:] (automatically retained)

            current_input = next_input  # Update for the next iteration

        return torch.cat(predictions, dim=1)  # Concatenate predictions along the time dimension


input_size = 3  
hidden_size = 50  
num_layers = 3  
learning_rate = 0.04
num_epochs = 50  
target_size = 1

model = LSTMModel(input_size, hidden_size, num_layers, target_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

train_loss_values = []
test_loss_values = []


for epoch in range(num_epochs):
    model.train()  
    optimizer.zero_grad()  
    y_pred = model(X_train_tensor) 

    train_loss = criterion(y_pred.squeeze(), y_train_tensor) 
    train_loss.backward()  
    optimizer.step()  
    train_loss_values.append(train_loss.item())

    model.eval()  
    with torch.no_grad():
        y_test_pred = model(X_test_tensor)
        test_loss = criterion(y_test_pred.squeeze(), y_test_tensor)

    test_loss_values.append(test_loss.item())

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')

# Number of steps to forecast
num_steps = 9
# Use the last sequence from the test set as the initial input
initial_input = X_test_tensor[-1].unsqueeze(0) # Shape: (1, seq_length, input_size)

# Perform multi-step forecasting
model.eval()
with torch.no_grad():
forecasted_values = model.forecast(initial_input, num_steps) 
pytorch lstm forecasting
1个回答
0
投票

您的

forecast
代码不使用上一个时间步骤的隐藏状态,因此模型无法使用任何序列信息。您需要设计模型以允许将隐藏状态传递给
forward
方法并在推理期间使用该隐藏状态。像这样的东西:

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, target_size):
        super(LSTMModel, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, target_size)  # Output layer for regression

    def forward(self, x, hidden=None):
        if hidden is None:
            hidden = self.get_hidden(x)
            
        lstm_out, hidden = self.lstm(x, hidden)
        out = self.fc(lstm_out[:, -1, :])  
        return out, hidden
    
    def get_hidden(self, x):
        # hidden state is of shape (n_layers, batch_size, d_hidden)
        # batch size is `x.shape[0]` for `batch_first=True`
        hidden = (
                torch.zeros(self.num_layers, x.shape[0], self.hidden_size, device=x.device),
                torch.zeros(self.num_layers, x.shape[0], self.hidden_size, device=x.device),
                )
        return hidden 
    
    def forecast(self, initial_input, num_steps):
        predictions = []  # Store forecasted TMP values
        current_input = initial_input.clone()  # Clone to avoid modifying the original input
        hidden = None # initial hidden state

        with torch.no_grad():
            for _ in range(num_steps):
                # use and update hidden state for forward pass
                next_output, hidden = self.forward(current_input, hidden) 
                current_input = next_output.unsqueeze(1)
                predictions.append(current_input)

        return torch.cat(predictions, dim=1)  # Concatenate predictions along the time dimension
© www.soinside.com 2019 - 2024. All rights reserved.