我的 LSTM 有三个输入 (x,y,z)。我的 LSTM 模型用于预测 z 的下一个时间步长。我有 9 个时间步长的回顾期。然后我需要使用递归函数预测 z 的下一个时间步长。但是,当我绘制预测 z 值时,我得到了不好的结果。数据来自我无法共享的 csv 文件。
这是我的代码:
data = np.column_stack((x,y, z))
def df_to_X_y(data, window_size=9):
X = [ ]
y = [ ]
for i in range(len(data) - window_size):
row = data[i:i + window_size]
X.append(row)
label = data[i + window_size, 0]
y.append(label)
return np.array(X), np.array(y)
X, y = df_to_X_y(data)
split_ratio = 0.8
split_idx = int(len(X) * split_ratio)
X_train = X[:split_idx]
X_test = X[split_idx:]
y_train = y[:split_idx]
y_test = y[split_idx:]
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, target_size):
super(LSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, target_size) # Output layer for regression
def forward(self, x):
lstm_out, _ = self.lstm(x)
# lstm_out, (hn, cn) = self.lstm(x)
out = self.fc(lstm_out[:, -1, :])
return out
def forecast(self, initial_input, num_steps):
predictions = [] # Store forecasted TMP values
current_input = initial_input.clone() # Clone to avoid modifying the original input
for _ in range(num_steps):
next_output = self.forward(current_input)
predictions.append(next_output.unsqueeze(1)) # Add time dimension
next_input = current_input.clone() # Clone the current input
next_input[:, :-1, :] = current_input[:, 1:, :] # Shift the window by 1 step
next_input[:, -1, 0] = next_output.squeeze(1) # Update z with the prediction
# Leave x and y unchanged in next_input[:, -1, 1:] (automatically retained)
current_input = next_input # Update for the next iteration
return torch.cat(predictions, dim=1) # Concatenate predictions along the time dimension
input_size = 3
hidden_size = 50
num_layers = 3
learning_rate = 0.04
num_epochs = 50
target_size = 1
model = LSTMModel(input_size, hidden_size, num_layers, target_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_loss_values = []
test_loss_values = []
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
y_pred = model(X_train_tensor)
train_loss = criterion(y_pred.squeeze(), y_train_tensor)
train_loss.backward()
optimizer.step()
train_loss_values.append(train_loss.item())
model.eval()
with torch.no_grad():
y_test_pred = model(X_test_tensor)
test_loss = criterion(y_test_pred.squeeze(), y_test_tensor)
test_loss_values.append(test_loss.item())
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')
# Number of steps to forecast
num_steps = 9
# Use the last sequence from the test set as the initial input
initial_input = X_test_tensor[-1].unsqueeze(0) # Shape: (1, seq_length, input_size)
# Perform multi-step forecasting
model.eval()
with torch.no_grad():
forecasted_values = model.forecast(initial_input, num_steps)
您的
forecast
代码不使用上一个时间步骤的隐藏状态,因此模型无法使用任何序列信息。您需要设计模型以允许将隐藏状态传递给 forward
方法并在推理期间使用该隐藏状态。像这样的东西:
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, target_size):
super(LSTMModel, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_size
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, target_size) # Output layer for regression
def forward(self, x, hidden=None):
if hidden is None:
hidden = self.get_hidden(x)
lstm_out, hidden = self.lstm(x, hidden)
out = self.fc(lstm_out[:, -1, :])
return out, hidden
def get_hidden(self, x):
# hidden state is of shape (n_layers, batch_size, d_hidden)
# batch size is `x.shape[0]` for `batch_first=True`
hidden = (
torch.zeros(self.num_layers, x.shape[0], self.hidden_size, device=x.device),
torch.zeros(self.num_layers, x.shape[0], self.hidden_size, device=x.device),
)
return hidden
def forecast(self, initial_input, num_steps):
predictions = [] # Store forecasted TMP values
current_input = initial_input.clone() # Clone to avoid modifying the original input
hidden = None # initial hidden state
with torch.no_grad():
for _ in range(num_steps):
# use and update hidden state for forward pass
next_output, hidden = self.forward(current_input, hidden)
current_input = next_output.unsqueeze(1)
predictions.append(current_input)
return torch.cat(predictions, dim=1) # Concatenate predictions along the time dimension