如何将多输入数据加载器传递给单输入模型

问题描述 投票:0回答:1

我有以下代码可以正常工作。但问题是它只适用于一批,因为我正在使用

next(iter)

我创建了一个张量流数据集,它必须为我的问题返回 3 个值(即

X: [x,y,z]
)。但我只需要将
x
值传递给模型。我需要将所有 3 个值打包在一起,因为稍后我将使用
y
z
。现在,问题是当我要调用
fit
时,我必须以某种方式将这 3 个值分开,以便正确调用网络架构。所以,我的问题是如何在这种情况下使用
fit
调用
PrefetchDataset

import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input, Dense,  Activation, \
    Conv2DTranspose, Conv2D, Reshape
from tensorflow.keras.models import Model

AUTOTUNE = tf.data.experimental.AUTOTUNE

def scale(X, a=-1, b=1, dtype='float32'):
    if a > b:
        a, b = b, a
    xmin = tf.cast(tf.math.reduce_min(X), dtype=dtype)
    xmax = tf.cast(tf.math.reduce_max(X), dtype=dtype)
    X = (X - xmin) / (xmax - xmin)
    scaled = X * (b - a) + a
    return scaled, xmin, xmax

def set_shape_b(x, y, z):
    x = tf.reshape(x,  [16, 16, 2])
    y = tf.reshape(y, [1])
    z = tf.reshape(z, [1])
    return x, y, z

def set_shape_a(x, y, z):
    x = tf.reshape(x,  [4, 4, 2])
    y = tf.reshape(y, [1])
    z = tf.reshape(z, [1])
    return x, y, z

def First(lr):
    inp = Input(lr)
    x = Dense(16)(inp)
    x = Reshape((4, 4, 16))(x)
    x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
    x = Conv2DTranspose(2, kernel_size=3, strides=2, padding='same')(x)
    output = Activation('tanh')(x)
    model = Model(inp, output, name='First')
    return model
    
def Second(hr):
    inp = Input(hr)
    x = Dense(16)(inp)
    x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
    x = Conv2D(2, kernel_size=3, strides=2, padding='same')(x)
    output = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=inp, outputs=output, name='Second')
    return model
    

def build_model(First, Second):
    inp = Input(shape=INP)
    gen = First(inp)
    output = Second(gen)
    model = Model(inputs=inp , outputs=[gen, output], name='model')
    return model

# Preproces --------------- #
a = np.random.random((20, 4, 4, 2)).astype('float32')
b = np.random.random((20, 16, 16, 2)).astype('float32')

dataset_a = tf.data.Dataset.from_tensor_slices(a)
dataset_b = tf.data.Dataset.from_tensor_slices(b)

dataset_b = dataset_b.map(lambda x: tf.py_function(scale,
                                                   [x], 
                                                   (tf.float32, tf.float32, tf.float32)))
dataset_b = dataset_b.map(set_shape_b)

dataset_a = dataset_a.map(lambda x: tf.py_function(scale,
                                                   [x], 
                                                   (tf.float32, tf.float32, tf.float32)))
dataset_a = dataset_a.map(set_shape_a)
 
dataset_ones = tf.data.Dataset.from_tensor_slices(tf.ones((len(b), 4, 4, 1)))   

dataset = tf.data.Dataset.zip((dataset_a, (dataset_b, dataset_ones)))

dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)

# Prepare models -------------------- #
INP = (4, 4, 2)
OUT = (16, 16, 2)

first = First(INP)
second = Second(OUT)
model = build_model(first, second)

model.compile(loss=['mse', 'binary_crossentropy'],
              optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4))


train_l, (train_h, train_ones) = next(iter(dataset))


# train ------------------
model.fit(train_l[0],
          [train_h[0], train_ones],
          epochs=2)
              

更新

def rescale(X_scaled, xmin, xmax):
    X = (xmax - xmin) * (X_scaled + 1) / 2.0 + xmin
    return X

class PlotCallback(tf.keras.callbacks.Callback):
    def __init__(self, image, xmin, xmax, model):
        self.image = image
        self.xmin = xmin
        self.xmax = xmax
        self.model = model
        
    def on_epoch_end(self, epoch, logs={}):
        preds = self.model.predict(self.image)
        y_pred = preds[0]
        y_pred = rescale(y_pred, self.xmin, self.xmax)

        
        fig, ax = plt.subplots(figsize=(14, 10))
        ax.imshow(y_pred[0][:, :, 0])
        plt.close()

我正在使用上述功能,当我试图适应时,我想要这样的东西:

model.fit(
    dataset,
    validation_data=dataset,
    epochs=2,
    callbacks=[PlotCallback(here_the_dataset_a_scaled_values,
                            xmin_from_dataset_a,
                            xmax_from_dataset_b, model)]
)
python-3.x tensorflow machine-learning keras deep-learning
1个回答
1
投票

按照上面的评论,要解决您的问题,您可以应用自定义函数以仅返回目标值。另外,请查看 tf.data.Dataset.map 以获取参考。

def set_shape(x, y, z, dims):
    x = tf.reshape(x,  dims)
    y = tf.reshape(y, [1])
    z = tf.reshape(z, [1])
    return x, y, z

dataset_a = dataset_a.map(lambda x, y, z: set_shape(x, y, z, dims=[4, 4, 2]))
dataset_b = dataset_b.map(lambda x, y, z: set_shape(x, y, z, dims=[16, 16, 2]))

def only_scale(x, y, z):
    return x

dataset_a = dataset_a.map(only_scale)
dataset_b = dataset_b.map(only_scale)

压缩和批处理数据。

dataset = tf.data.Dataset.zip(
    (dataset_a, (dataset_b, dataset_ones))
)
dataset = dataset.cache()
dataset = dataset.batch(2)
dataset = dataset.prefetch(buffer_size=AUTOTUNE)

a, b = next(iter(dataset)) 
a.shape, b[0].shape, b[1].shape
(TensorShape([2, 4, 4, 2]),
 TensorShape([2, 16, 16, 2]),
 TensorShape([2, 4, 4, 1]))

现在,我们可以将它传递给 fit 方法。

# train ------------------
model.fit(
    dataset,
    epochs=2
)

Epoch 1/2
2s 6ms/step - loss: 1.0283 - First_loss: 0.3368 - Second_loss: 0.6914
Epoch 2/2
0s 4ms/step - loss: 1.0228 - First_loss: 0.3367 - Second_loss: 0.6860

更新 1

正如评论中提到的,

.map(only_scale)
不能用于在训练过程中接收
(scale, xmin, xmax)
以进行缩放。但是我们不能将这种数据格式传递给不需要这种输入规范的模型。换句话说,模型代码不知道
xmin
xmax

在这种情况下,有两种解决方法。一种是在keras中使用自定义训练循环,另一种是覆盖

train_step
方法的
fit
函数。让我们试试第二个。在这种情况下,我们不需要使用来自数据 API 的
.map(only_scale)
方法。这是 references 关于覆盖 fit 方法。

让我们构建一个自定义模型来覆盖

trian_step
和(还有
test_step
用于验证数据)。仅供参考,还有
predict_step
.

class CustomFitter(keras.Model):
    def __init__(self, model, **kwargs):
        super().__init__(**kwargs)
        self.model = model 
    
    def call(self, inputs):
        return self.model(inputs)
    
    def unpack(self, data):
        x, y = data
        # x: dataset_a
        # y: (dataset_b, dataset_ones)
        # dataset_a / datast_b: (scale, xmin, xmax)
        scale_y = y[0][0]
        ones_y = y[1]
        y = (scale_y, ones_y)
        x = x[0]
        return x, y
    
    def train_step(self, data):
        x, y = self.unpack(data)
        return super().train_step((x, y))
    
    def test_step(self, data):
        x, y = self.unpack(data)
        return super().test_step((x, y))

接下来,我们可以做

model = build_model(first, second)
model = CustomFitter(model)
model.compile(
    loss=['mse', 'binary_crossentropy'],
    optimizer= tf.keras.optimizers.Adam(learning_rate=1e-4)
)

接下来,我们现在可以拟合数据(不使用

only_scale
方法)。

model.fit(
    dataset,
    validation_data=dataset,
    epochs=2
)
Epoch 1/2
45ms/step - loss: 1.0278 - output_1_loss: 0.3358 - output_2_loss: 0.6919 - val_loss: 1.0262 - val_output_1_loss: 0.3357 - val_output_2_loss: 0.6905
Epoch 2/2
8ms/step - loss: 1.0249 - output_1_loss: 0.3356 - output_2_loss: 0.6893 - val_loss: 1.0234 - val_output_1_loss: 0.3355 - val_output_2_loss: 0.6879

更新 2

关于在回调中使用

xmin
xmax
来重新缩放预测数组和绘图,我们可以做如下事情。

  1. 我们会在训练时存储
    xmin
    xmax
    的值。我们现在将从验证数据集中存储这些值。
  2. 稍后在回调中,我们在
    on_epoch_end
    使用此值,并在
    on_epoch_begin
    为下一个纪元重置。

首先我们会做:

from tensorflow.experimental import numpy as tnp

with tf.device('/CPU:0'):
    scaling_xmin = tf.Variable(
        tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
    )
    scaling_xmax = tf.Variable(
        tnp.empty((0, 1), dtype=tf.float32), shape=[None, 1], trainable=False
    )
class CustomFitter(keras.Model):
    ....
    
    def unpack(self, data, data_src='valid'):
        x, y = data
        # x: dataset_a
        # y: (dataset_b, dataset_ones)
        # dataset_a / datast_b: (scale, xmin, xmax)
        
        if data_src == 'valid':
            scaling_xmin.assign(
                tf.concat([scaling_xmin, x[1]], axis=0)
            )
            scaling_xmax.assign(
                tf.concat([scaling_xmax, x[2]], axis=0)
            )
        
        scale_y = y[0][0]
        ones_y = y[1]
        y = (scale_y, ones_y)
        x = x[0]
        return x, y

    def train_step(self, data):
        x, y = self.unpack(data, data_src='train')
        return super().train_step((x,y))
    
    def test_step(self, data):
        x, y = self.unpack(data, data_src='valid')
        return super().test_step((x, y))

现在,在回调中,我们将做

class PlotCallback(tf.keras.callbacks.Callback):
    def __init__(self, image):
        self.image = image
        
    def on_epoch_begin(self, epoch, logs=None):
        scaling_xmin.assign(
            tf.Variable(
                tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
            )
        )
        scaling_xmax.assign(
            tf.Variable(
                tnp.empty((0,1), dtype=tf.float32), shape=[None,1]
            )
        )

    def on_epoch_end(self, epoch, logs={}):
        preds = self.model.predict(self.image)
        y_pred = preds[0]
        
        # assuming y_pred.shape[0] == xmin.shape[0] == xmax.shape[0]
        for yp, xmin, xmax in zip(
            y_pred, scaling_xmin.numpy(), scaling_xmax.numpy()
        ):
            yp = rescale(
                yp, xmin, xmax
            )
            fig, ax = plt.subplots(figsize=(14, 10))
            ax.imshow(yp[:, :, 0])
            break
        plt.show()

接下来,我们可以调用这个回调。请注意,我们正在传递 2D 单一输入。如果

PlotCallback(dataset)
通过,请确保实现
predict_step
,这与上面模型代码中的
test_step
几乎相同。

a = np.random.random((20, 4, 4, 2)).astype('float32')
custom_model.fit(
    dataset,
    validation_data=dataset,
    callbacks=[PlotCallback(a)],
    epochs=2
)
107ms/step - loss: 1.0251 - output_1_loss: 0.3387 - output_2_loss: 0.6864 - val_loss: 1.0239 - val_output_1_loss: 0.3386 - val_output_2_loss: 0.6853

[plot will be displayed]
© www.soinside.com 2019 - 2024. All rights reserved.