我正在按照本指南在Keras中实现(变分)自动编码器(VAE)。
VAE 从
topomaps
文件夹读取数据并从 labels
文件夹读取标签。它们都包含 .npy
文件,本质上是存储在磁盘上的 numpy 数组。
我将数据集分为 80% 的训练数据和 20% 的测试数据。具体来说:
x_train
形状是(245760, 40, 40, 1)
y_train
形状是(245760,)
x_test
形状是(61440, 40, 40, 1)
y_test
形状是(61440,)
不幸的是,尽管(训练)
loss
和val_loss
相当低(分别是2.9246e-04
和-4.8249e-04
),如果我目视检查我的VAE的“重建技能”,我可以注意到它们很差,因为重建的图像与原始图像完全不相似:
我使用此配置运行了演示:
latent_dim=25
。该值不得更改epochs=2
batch_size=512
optimizer=Adam()
learning_rate=0.001
我知道
epochs
很小,但这只是一个演示。当我弄清楚为什么我的 VAE 不起作用,即使损失非常低时,我的目标是放大它。
这是运行输出:
Epoch 1/2
2023-08-30 11:35:49.408811: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - ETA: 0s - loss: 60.0042 - reconstruction_loss: 11.0072 - kl_loss: 0.09892023-08-30 11:38:40.538661: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
384/384 [==============================] - 190s 492ms/step - loss: 59.8772 - reconstruction_loss: 11.0072 - kl_loss: 0.0989 - val_loss: 5.5495e-04 - val_reconstruction_loss: 5.4875e-04 - val_kl_loss: 6.1989e-06
Epoch 2/2
384/384 [==============================] - 188s 490ms/step - loss: 3.0879e-04 - reconstruction_loss: 3.0472e-04 - kl_loss: 1.5222e-06 - val_loss: 3.4318e-04 - val_reconstruction_loss: 3.4303e-04 - val_kl_loss: 1.4901e-07
2023-08-30 11:42:17.049 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:26.493 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:37.200 Python[2419:58392] +[CATransaction synchronize] called within transaction
2023-08-30 11:42:41.800433: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.
1920/1920 [==============================] - 29s 15ms/step
2023-08-30 11:43:16.276 Python[2419:58392] +[CATransaction synchronize] called within transaction
Process finished with exit code 0
学习曲线:
这是三个模型类:
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
def call(self, inputs, training=None, mask=None):
_, _, z = self.encoder(inputs)
outputs = self.decoder(z)
return outputs
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Compute gradient
grads = tape.gradient(total_loss, self.trainable_weights)
# Update weights
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
# Update my own metrics
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
def test_step(self, data):
# Forward pass
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Compute losses
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
# Update my own metrics
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
class Encoder(keras.Model):
def __init__(self, latent_dimension, input_shape):
super(Encoder, self).__init__()
self.latent_dim = latent_dimension
self.conv_block1 = keras.Sequential([
layers.Input(shape=input_shape),
layers.Conv2D(filters=64, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.conv_block2 = keras.Sequential([
layers.Conv2D(filters=128, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.conv_block3 = keras.Sequential([
layers.Conv2D(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.flatten = layers.Flatten()
self.dense = layers.Dense(units=100, activation="relu")
self.z_mean = layers.Dense(latent_dimension, name="z_mean")
self.z_log_var = layers.Dense(latent_dimension, name="z_log_var")
self.sampling = sample
def call(self, inputs, training=None, mask=None):
x = self.conv_block1(inputs)
x = self.conv_block2(x)
x = self.conv_block3(x)
x = self.flatten(x)
x = self.dense(x)
z_mean = self.z_mean(x)
z_log_var = self.z_log_var(x)
z = self.sampling(z_mean, z_log_var)
return z_mean, z_log_var, z
class Decoder(keras.Model):
def __init__(self, latent_dimension):
super(Decoder, self).__init__()
self.latent_dim = latent_dimension
self.dense1 = keras.Sequential([
layers.Dense(units=100, activation="relu"),
layers.BatchNormalization()
])
self.dense2 = keras.Sequential([
layers.Dense(units=1024, activation="relu"),
layers.BatchNormalization()
])
self.dense3 = keras.Sequential([
layers.Dense(units=4096, activation="relu"),
layers.BatchNormalization()
])
self.reshape = layers.Reshape((4, 4, 256))
self.deconv1 = keras.Sequential([
layers.Conv2DTranspose(filters=256, kernel_size=3, activation="relu", strides=2, padding="same"),
layers.BatchNormalization()
])
self.deconv2 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=1, padding="same"),
layers.BatchNormalization()
])
self.deconv3 = keras.Sequential([
layers.Conv2DTranspose(filters=128, kernel_size=3, activation="relu", strides=2, padding="valid"),
layers.BatchNormalization()
])
self.deconv4 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=1, padding="valid"),
layers.BatchNormalization()
])
self.deconv5 = keras.Sequential([
layers.Conv2DTranspose(filters=64, kernel_size=3, activation="relu", strides=2, padding="valid"),
layers.BatchNormalization()
])
self.deconv6 = layers.Conv2DTranspose(filters=1, kernel_size=2, activation="sigmoid", padding="valid")
def call(self, inputs, training=None, mask=None):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
x = self.reshape(x)
x = self.deconv1(x)
x = self.deconv2(x)
x = self.deconv3(x)
x = self.deconv4(x)
x = self.deconv5(x)
decoder_outputs = self.deconv6(x)
return decoder_outputs
请注意,VAE 的实现属于我已经发布的指南。
这是主要功能:
if __name__ == '__main__':
# Load data
x_train, x_test, y_train, y_test = load_data("topomaps", "labels", 0.2)
# Expand dimensions to (None, 40, 40, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
# Print data shapes
print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)
# Normalize the data
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
# Compiling the VAE
latent_dimension = 25 # Do not change
encoder = Encoder(latent_dimension, (40, 40, 1))
decoder = Decoder(latent_dimension)
vae = VAE(encoder, decoder)
vae.compile(Adam(learning_rate=0.001))
# Training
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
print("x_val shape:", x_val.shape)
print("y_val shape:", y_val.shape)
epochs = 2
batch_size = 512
history = vae.fit(x_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val,))
# Plot learning curves
plot_metric(history, "loss")
# Check reconstruction skills against a random test sample
image_index = 5
plt.title(f"Original image {image_index}")
original_image = x_test[image_index]
plt.imshow(original_image, cmap="gray")
plt.show()
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
plt.imshow(reconstructed_image, cmap="gray")
plt.show()
这些是我使用的一些功能:
def load_data(topomaps_folder: str, labels_folder: str, test_size):
x, y = _create_dataset(topomaps_folder, labels_folder)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size)
return x_train, x_test, y_train, y_test
def _create_dataset(topomaps_folder, labels_folder):
topomaps_files = os.listdir(topomaps_folder)
labels_files = os.listdir(labels_folder)
topomaps_files.sort()
labels_files.sort()
x = []
y = []
n_files = len(topomaps_files)
for topomaps_file, labels_file in tqdm(zip(topomaps_files, labels_files), total=n_files, desc="Loading data set"):
topomaps_array = np.load(f"{topomaps_folder}/{topomaps_file}")
labels_array = np.load(f"{labels_folder}/{labels_file}")
if topomaps_array.shape[0] != labels_array.shape[0]:
raise Exception("Shapes must be equal")
for i in range(topomaps_array.shape[0]):
x.append(topomaps_array[i])
y.append(labels_array[i])
x = np.array(x)
y = np.array(y)
return x, y
def sample(z_mean, z_log_var):
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
stddev = tf.exp(0.5 * z_log_var)
return z_mean + stddev * epsilon
def plot_metric(history, metric):
plt.plot(history.history[metric])
plt.plot(history.history['val_' + metric])
plt.title(metric)
plt.ylabel(metric)
plt.xlabel('epoch')
plt.legend(['train', 'validation'])
plt.show()
编辑
- 你能尝试使用更低的训练集来看看它是否过度拟合并重建已知图像吗? 好的,这就是我在
main
函数中所做的
# Reduce DS size
x_train = x_train[:500]
y_train = y_train[:500]
x_test = x_test[:500]
y_test = y_test[:500]
这就是我得到的:
学习曲线是:
如果我运行相同的配置但设置
epochs=100
我得到:
和损失:
- 不确定您是否做到了...但是您在绘图时是否将输出转换回非标准化值? 这就是我所做的:
plt.title(f"Reconstructed image {image_index}, latent_dim = {latent_dimension}, epochs = {epochs}, "
f"batch_size = {batch_size}")
x_test_reconstructed = vae.predict(x_test)
reconstructed_image = x_test_reconstructed[image_index]
reconstructed_image = reconstructed_image * 255
plt.imshow(reconstructed_image, cmap="gray")
plt.show()
但我还是明白:
- 当批量大小为 4、学习率为 0.0002、100 个 epoch 时,损失图是什么样子? 重建图像全黑,损失曲线为:
我的实施没有出现任何问题。我运行了 VAE 教程的“副本”,并在 1 epoch 后得到了以下结果:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import cv2
import matplotlib.pyplot as plt
class Sampling(layers.Layer):
def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.random.normal(shape=(batch, dim))
return z_mean + tf.exp(0.5 * z_log_var) * epsilon
latent_dim = 25
encoder_inputs = keras.Input(shape=(40, 40, 1))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(16, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(6400, activation="relu")(latent_inputs)
x = layers.Reshape((10,10,64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="reconstruction_loss"
)
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
)
)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
def create_data():
x_train = []
data = cv2.imread("IMAGE",0) #used a screen clipping since I do not have your data source
data = cv2.resize(data, (40,40), interpolation=cv2.INTER_AREA) # resize it to the same 40x40 limit
gaussian = np.random.normal(0, 0.2, (data.shape[0],data.shape[1])) #add noise for realism
x_train = np.array([data]*1000) #duplicated with random noise 1000x
x_train[:] = x_train[:]+gaussian
x_test = x_train
return x_train, x_test, data
x_train,x_test , data = create_data()
x = x_train
x = np.expand_dims(x,-1).astype("float32") / 255
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(x, epochs=1, batch_size=8)
data_input = np.expand_dims(data,-1).astype("float32")/255
print(data_input.shape)
latent = vae.encoder.predict(x)
print(latent)
output = vae.decoder.predict(latent[0])
digit = output[0]
plt.figure("Source")
plt.imshow(data,cmap="gray")
plt.figure("output")
plt.imshow(digit,cmap='gray')
plt.show()
我没有你的数据,所以我无法检查你的数据加载器是否有任何错误。您可能想检查自动编码器和解码器上的调用函数,但按照教程代码我没有找到您的问题。
我建议对您的数据进行全面调试:
除非我有一些输入数据的样本,否则确实无法正确调试您的代码。我高度怀疑数据输入,因为模型构建代码看起来不错。
问题涉及数据标准化。
x_train = (x_train - np.min(x_train)) / (np.max(x_train) - np.min(x_train))
和
x_test = (x_test - np.min(x_test)) / (np.max(x_test) - np.min(x_test))
应该可以解决问题。