我正在尝试创建一个模型来根据天气数据预测能源电网负载(电网消耗的净电量)。在生产中,我们没有负载数据来进行标准批量预测。我们正在尝试一种自回归方法,以便我们可以向其提供最后报告的负载读数和接下来 24 小时的预测天气数据,以生成 24 小时的负载预测。
我正在使用本教程,它建议对模型类进行子类化以进行逐步预测。我相信 model.fit 文档也建议子类化。
上面的教程创建了一个名为 Feedback 的 keras.model 子类,并覆盖了 model.call() 方法,该方法在训练和预测期间被调用。
def call(self, inputs, training=None):
# Use a TensorArray to capture dynamically unrolled outputs.
predictions = []
# Initialize the LSTM state.
prediction, state = self.warmup(inputs)
# Insert the first prediction.
predictions.append(prediction)
# Run the rest of the prediction steps.
for n in range(1, self.out_steps):
# Use the last prediction as input.
x = prediction
# Execute one lstm step.
x, state = self.lstm_cell(x, states=state,
training=training)
# Convert the lstm output to a prediction.
prediction = self.dense(x)
# Add the prediction to the output.
predictions.append(prediction)
# predictions.shape => (time, batch, features)
predictions = tf.stack(predictions)
# predictions.shape => (batch, time, features)
predictions = tf.transpose(predictions, [1, 0, 2])
return predictions
调用 fit() 时,我传入数据集进行训练和验证。通过 keras.utils.timeseries_dataset_from_array() 创建的数据集。
history = model.fit(dataset_train, epochs=epochs,
validation_data=dataset_val,
callbacks=[es_callback, modelckpt_callback])
我的数据形状是每小时的时间序列数据,11列天气数据和1列目标。我使用的窗口大小为两个小时。
我的问题是,for 循环中的预测调用似乎仅使用先前的预测作为输入。我不明白他们如何访问训练或验证数据集。
我尝试在 Pycharm 调试器中查找访问数据集的方法,但没有找到任何内容。我也尝试寻找进行类似子类化的人,但本教程是我能找到的最好的教程。
如果需要运行示例,该教程将介绍数据集创建和子类实现。我希望有人可以解释如何正确地对 keras.model 进行子类化(以与该教程类似的方式)以获取多列输入并进行自回归预测。 call()方法的重写是我最困惑的地方。
prediction, state = self.warmup(inputs)
获取整个输入并生成单个初始预测。然后用它来预测下一个时间步骤。 请注意,如果 num_features 不等于目标形状,则最后需要一个下/上采样层。
我编辑的版本:
class FeedBack(tf.keras.Model):
def __init__(self, units, out_steps, num_features, num_outputs):
super().__init__()
self.out_steps = out_steps
self.units = units
self.lstm_cell = tf.keras.layers.LSTMCell(units)
# Also wrap the LSTMCell in an RNN to simplify the `warmup` method.
self.lstm_rnn = tf.keras.layers.RNN(self.lstm_cell, return_state=True)
self.dense = tf.keras.layers.Dense(num_features)
self.output_layer = tf.keras.layers.Dense(num_outputs)
def warmup(self, inputs):
# inputs.shape => (batch, time, features)
# x.shape => (batch, lstm_units)
x, *state = self.lstm_rnn(inputs)
# predictions.shape => (batch, features)
prediction = self.dense(x)
return prediction, state
def call(self, inputs, training=None):
# Use a TensorArray to capture dynamically unrolled outputs.
predictions = []
# Initialize the LSTM state.
prediction, state = self.warmup(inputs)
# Insert the first prediction.
predictions.append(prediction)
# Run the rest of the prediction steps.
for n in range(1, self.out_steps):
# Use the last prediction as input.
x = prediction
# Execute one lstm step.
x, state = self.lstm_cell(x, states=state,
training=training)
# Convert the lstm output to a prediction.
prediction = self.dense(x)
# Add the prediction to the output.
predictions.append(prediction)
# predictions.shape => (time, batch, features)
predictions = tf.stack(predictions)
# predictions.shape => (batch, time, features)
predictions = tf.transpose(predictions, [1, 0, 2])
predictions = self.output_layer(predictions)
return predictions