我有以下代码可以正常工作。但问题是它只适用于一批,因为我正在使用
next(iter)
我创建了一个张量流数据集,它必须为我的问题返回 3 个值(即
X: [x,y,z]
)。但我只需要将 x
值传递给模型。我需要将所有 3 个值打包在一起,因为稍后我将使用 y
和 z
。现在,问题是当我要调用 fit
时,我必须以某种方式将这 3 个值分开,以便正确调用网络架构。所以,我的问题是如何在这种情况下使用 fit
调用 PrefetchDataset
。
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input, Dense, Activation, \
Conv2DTranspose, Conv2D, Reshape
from tensorflow.keras.models import Model
AUTOTUNE = tf.data.experimental.AUTOTUNE
def scale(X, a=-1, b=1, dtype='float32'):
if a > b:
a, b = b, a
xmin = tf.cast(tf.math.reduce_min(X), dtype=dtype)
xmax = tf.cast(tf.math.reduce_max(X), dtype=dtype)
X = (X - xmin) / (xmax - xmin)
scaled = X * (b - a) + a
return scaled, xmin, xmax
def set_shape_b(x, y, z):
x = tf.reshape(x, [16, 16, 2])
y = tf.reshape(y, [1])
z = tf.reshape(z, [1])
return x, y, z
def set_shape_a(x, y, z):
x = tf.reshape(x, [4, 4, 2])
y = tf.reshape(y, [1])
z = tf.reshape(z, [1])
return x, y, z
def First(lr):
inp = Input(lr)
x = Dense(16)(inp)
x = Reshape((4, 4, 16))(x)
x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
output = Activation('tanh')(x)
model = Model(inp, output, name='First')
return model
def Second(hr):
inp = Input(hr)
x = Dense(16)(inp)
x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
output = Dense(1, activation='sigmoid')(x)
model = Model(inputs=inp, outputs=output, name='Second')
return model
def build_model(First, Second):
inp = Input(shape=INP)
gen = First(inp)
output = Second(gen)
model = Model(inputs=inp , outputs=[gen, output], name='model')
return model
# Preproces --------------- #
a = np.random.random((20, 4, 4, 2)).astype('float32')
b = np.random.random((20, 16, 16, 2)).astype('float32')
dataset_a = tf.data.Dataset.from_tensor_slices(a)
dataset_b = tf.data.Dataset.from_tensor_slices(b)
dataset_b = dataset_b.map(lambda x: tf.py_function(scale,
[x],
(tf.float32, tf.float32, tf.float32)))
dataset_b = dataset_b.map(set_shape_b)
dataset_a = dataset_a.map(lambda x: tf.py_function(scale,
[x],
(tf.float32, tf.float32, tf.float32)))
dataset_a = dataset_a.map(set_shape_a)
dataset_ones = tf.data.Dataset.from_tensor_slices(tf.ones((len(b), 4, 4, 1)))
dataset = tf.data.Dataset.zip((dataset_a, (dataset_b, dataset_ones)))
dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
# Prepare models -------------------- #
INP = (4, 4, 2)
OUT = (16, 16, 2)
first = First(INP)
second = Second(OUT)
model = build_model(first, second)
model.compile(loss=['mse', 'binary_crossentropy'],
optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4))
train_l, (train_h, train_ones) = next(iter(dataset))
# train ------------------
model.fit(train_l[0],
[train_h[0], train_ones],
epochs=2)
更新
def rescale(X_scaled, xmin, xmax):
X = (xmax - xmin) * (X_scaled + 1) / 2.0 + xmin
return X
class PlotCallback(tf.keras.callbacks.Callback):
def __init__(self, image, xmin, xmax, model):
self.image = image
self.xmin = xmin
self.xmax = xmax
self.model = model
def on_epoch_end(self, epoch, logs={}):
preds = self.model.predict(self.image)
y_pred = preds[0]
y_pred = rescale(y_pred, self.xmin, self.xmax)
fig, ax = plt.subplots(figsize=(14, 10))
ax.imshow(y_pred[0][:, :, 0])
plt.close()
我正在使用上述功能,当我试图适应时,我想要这样的东西:
model.fit(
dataset,
validation_data=dataset,
epochs=2,
callbacks=[PlotCallback(here_the_dataset_a_scaled_values,
xmin_from_dataset_a,
xmax_from_dataset_b, model)]
)
按照上面的评论,要解决您的问题,您可以应用自定义函数以仅返回目标值。另外,请查看 tf.data.Dataset.map 以获取参考。
def set_shape(x, y, z, dims):
x = tf.reshape(x, dims)
y = tf.reshape(y, [1])
z = tf.reshape(z, [1])
return x, y, z
dataset_a = dataset_a.map(lambda x, y, z: set_shape(x, y, z, dims=[4, 4, 2]))
dataset_b = dataset_b.map(lambda x, y, z: set_shape(x, y, z, dims=[16, 16, 2]))
def only_scale(x, y, z):
return x
dataset_a = dataset_a.map(only_scale)
dataset_b = dataset_b.map(only_scale)
压缩和批处理数据。
dataset = tf.data.Dataset.zip(
(dataset_a, (dataset_b, dataset_ones))
)
dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)
a, b = next(iter(dataset))
a.shape, b[0].shape, b[1].shape
(TensorShape([2, 4, 4, 2]),
TensorShape([2, 16, 16, 2]),
TensorShape([2, 4, 4, 1]))
现在,我们可以将它传递给 fit 方法。
# train ------------------
model.fit(
dataset,
epochs=2
)
Epoch 1/2
2s 6ms/step - loss: 1.0283 - First_loss: 0.3368 - Second_loss: 0.6914
Epoch 2/2
0s 4ms/step - loss: 1.0228 - First_loss: 0.3367 - Second_loss: 0.6860
正如评论中提到的,
.map(only_scale)
不能用于在训练过程中接收 (scale, xmin, xmax)
以进行缩放。但是我们不能将这种数据格式传递给不需要这种输入规范的模型。换句话说,模型代码不知道xmin
和xmax
。
在这种情况下,有两种解决方法。一种是在keras中使用自定义训练循环,另一种是覆盖
train_step
方法的fit
函数。让我们试试第二个。在这种情况下,我们不需要使用来自数据 API 的 .map(only_scale)
方法。这是 references 关于覆盖 fit 方法。
让我们构建一个自定义模型来覆盖
trian_step
和(还有 test_step
用于验证数据)。仅供参考,还有predict_step
.
class CustomFitter(keras.Model):
def __init__(self, model, **kwargs):
super().__init__(**kwargs)
self.model = model
def call(self, inputs):
return self.model(inputs)
def unpack(self, data):
x, y = data
# x: dataset_a
# y: (dataset_b, dataset_ones)
# dataset_a / datast_b: (scale, xmin, xmax)
scale_y = y[0][0]
ones_y = y[1]
y = (scale_y, ones_y)
x = x[0]
return x, y
def train_step(self, data):
x, y = self.unpack(data)
return super().train_step((x, y))
def test_step(self, data):
x, y = self.unpack(data)
return super().test_step((x, y))
接下来,我们可以做
model = build_model(first, second)
model = CustomFitter(model)
model.compile(
loss=['mse', 'binary_crossentropy'],
optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4)
)
接下来,我们现在可以拟合数据(不使用
only_scale
方法)。
model.fit(
dataset,
validation_data=dataset,
epochs=2
)
Epoch 1/2
45ms/step - loss: 1.0278 - output_1_loss: 0.3358 - output_2_loss: 0.6919 - val_loss: 1.0262 - val_output_1_loss: 0.3357 - val_output_2_loss: 0.6905
Epoch 2/2
8ms/step - loss: 1.0249 - output_1_loss: 0.3356 - output_2_loss: 0.6893 - val_loss: 1.0234 - val_output_1_loss: 0.3355 - val_output_2_loss: 0.6879
关于在回调中使用
xmin
和 xmax
来重新缩放预测数组和绘图,我们可以做如下事情。
xmin
和xmax
的值。我们现在将从验证数据集中存储这些值。on_epoch_end
使用此值,并在 on_epoch_begin
为下一个纪元重置。首先我们会做:
from tensorflow.experimental import numpy as tnp
with tf.device('/CPU:0'):
scaling_xmin = tf.Variable(
tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
)
scaling_xmax = tf.Variable(
tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
)
class CustomFitter(keras.Model):
....
def unpack(self, data, data_src='valid'):
x, y = data
# x: dataset_a
# y: (dataset_b, dataset_ones)
# dataset_a / datast_b: (scale, xmin, xmax)
if data_src == 'valid':
scaling_xmin.assign(
tf.concat([scaling_xmin, x[1]], axis=0)
)
scaling_xmax.assign(
tf.concat([scaling_xmax, x[2]], axis=0)
)
scale_y = y[0][0]
ones_y = y[1]
y = (scale_y, ones_y)
x = x[0]
return x, y
def train_step(self, data):
x, y = self.unpack(data, data_src='train')
return super().train_step((x,y))
def test_step(self, data):
x, y = self.unpack(data, data_src='valid')
return super().test_step((x, y))
现在,在回调中,我们将做
class PlotCallback(tf.keras.callbacks.Callback):
def __init__(self, image):
self.image = image
def on_epoch_begin(self, epoch, logs=None):
scaling_xmin.assign(
tf.Variable(
tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
)
)
scaling_xmax.assign(
tf.Variable(
tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
)
)
def on_epoch_end(self, epoch, logs={}):
preds = self.model.predict(self.image)
y_pred = preds[0]
# assuming y_pred.shape[0] == xmin.shape[0] == xmax.shape[0]
for yp, xmin, xmax in zip(
y_pred, scaling_xmin.numpy(), scaling_xmax.numpy()
):
yp = rescale(
yp, xmin, xmax
)
fig, ax = plt.subplots(figsize=(14, 10))
ax.imshow(yp[:, :, 0])
break
plt.show()
接下来,我们可以调用这个回调。请注意,我们正在传递 2D 单一输入。如果
PlotCallback(dataset)
通过,请确保实现 predict_step
,这与上面模型代码中的 test_step
几乎相同。
a = np.random.random((20, 4, 4, 2)).astype('float32')
custom_model.fit(
dataset,
validation_data=dataset,
callbacks=[PlotCallback(a)],
epochs=2
)
107ms/step - loss: 1.0251 - output_1_loss: 0.3387 - output_2_loss: 0.6864 - val_loss: 1.0239 - val_output_1_loss: 0.3386 - val_output_2_loss: 0.6853
[plot will be displayed]