我正在尝试将 Capsule-Forensics 的 PyTorch 实现转换为 TensorFlow。当我能够编译模型并查看其摘要时,我认为我成功转换了模型,如下所示。
Model: "model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
input (InputLayer) [(None, 256, 256, 1)] 0
_________________________________________________________________
conv0 (Conv2D) (None, 256, 256, 1) 26
_________________________________________________________________
vgg16_block1_conv1 (Conv2D) (None, 256, 256, 64) 640
_________________________________________________________________
vgg16_block1_conv2 (Conv2D) (None, 256, 256, 64) 36928
_________________________________________________________________
vgg16_block1_pool (MaxPoolin (None, 128, 128, 64) 0
_________________________________________________________________
vgg16_block2_conv1 (Conv2D) (None, 128, 128, 128) 73856
_________________________________________________________________
vgg16_block2_conv2 (Conv2D) (None, 128, 128, 128) 147584
_________________________________________________________________
vgg16_block2_pool (MaxPoolin (None, 64, 64, 128) 0
_________________________________________________________________
vgg16_block3_conv1 (Conv2D) (None, 64, 64, 256) 295168
_________________________________________________________________
vgg16_block3_conv2 (Conv2D) (None, 64, 64, 256) 590080
_________________________________________________________________
vgg16_block3_conv3 (Conv2D) (None, 64, 64, 256) 590080
_________________________________________________________________
vgg16_block3_pool (MaxPoolin (None, 32, 32, 256) 0
_________________________________________________________________
primary_caps (PrimaryCaps) (None, 10, 8) 1584210
_________________________________________________________________
class_caps (ClassCaps) (None, 2, 4) 640
_________________________________________________________________
y (Lambda) (None, 2) 0
=================================================================
Total params: 3,319,212
Trainable params: 1,583,070
Non-trainable params: 1,736,142
_________________________________________________________________
但是,当我尝试训练它时遇到了错误:
ValueError: Dimensions must be equal, but are 32 and 4 for '{{node margin_loss/mul}} = Mul[T=DT_FLOAT](IteratorGetNext:1, margin_loss/Square)' with input shapes: [32,2], [4,2].
我打印了张量进行调试,我认为问题在于主胶囊末尾的重塑层(View 类),如下所示。 View 输入的形状为 (32, 1, 1),其中 32 是批量大小。整形为(-1, 8)后,输出为(4, 8)。我对重塑层的实现是错误的还是由于一维卷积层之后输入张量的形状而导致的错误?我尝试过设置固定的批量大小但无济于事。也许还有一件事我忽略了?
x reshaped in StatsNet
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/stats_net_7/Reshape:0", shape=(32, 32, 512), dtype=float32)
stats output
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/stats_net_7/stack:0", shape=(32, 2, 32), dtype=float32)
view inputs
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/batch_normalization_31/batchnorm/add_1:0", shape=(32, 1, 1), dtype=float32)
view output
Tensor("model_CapsuleForensics-CustomVGG16_S_UNIWARD_04bpp/primary_caps/sequential_7/view_7/Reshape:0", shape=(4, 8), dtype=float32)
为了供您参考,我在 PyTorch 中包含了原始 Capsule-Forensics 中的 View 类,以及我在 TensorFlow 中实现的模型(包括其层)。
在 Pytorch 中查看 Capsule-Forensics 的类:
class View(nn.Module):
def __init__(self, *shape):
super(View, self).__init__()
self.shape = shape
def forward(self, input):
return input.view(self.shape)
查看我的 TensorFlow 实现中的类:
class View(Layer):
def __init__(self, shape):
super(View, self).__init__()
self.shape = shape
def call(self, inputs):
print("\nview inputs")
print(inputs)
output = tf.reshape(inputs, self.shape)
print("\nview output")
print(output)
return output
我的 TF 实现中的 StatsNet 类:
class StatsNet(Layer):
def __init__(self):
super(StatsNet, self).__init__()
def call(self, x):
# Reshape x to have shape [batch_size, height, width, channels]
height = x.shape[1]
width = x.shape[2]
channels = x.shape[3]
x_reshaped = tf.reshape(x, [-1, height, width * channels]) # Dynamic batch size
print("\nx reshaped in StatsNet");
print(x_reshaped)
# Calculate mean and standard deviation along the last dimension
mean = tf.reduce_mean(x_reshaped, axis=-1)
std = tf.math.reduce_std(x_reshaped, axis=-1)
# Stack mean and std along a new dimension
stats = tf.stack([mean, std], axis=1)
print("\nstats output");
print(stats)
return stats
我的 TF 实现中的主胶囊层和类胶囊层。它们分别对应于原始 PyTorch 实现的 FeatureExtractor 和 RoutingLayer 类:
class PrimaryCaps(Layer):
def __init__(self, num_capsules):
super(PrimaryCaps, self).__init__()
self.num_capsules = num_capsules
def get_config(self):
config = super().get_config().copy()
config.update(
{
"num_capsules": self.num_capsules,
}
)
return config
def build(self, input):
self.capsules = [
self.create_capsule() for _ in range(self.num_capsules)
]
self.built = True
def create_capsule(self):
return Sequential([
Conv2D(64, kernel_size=3, strides=1, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(),
ReLU(),
Conv2D(16, kernel_size=3, strides=1, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(beta_initializer='zeros', gamma_initializer=initializers.RandomNormal(mean=1.0, stddev=0.02)),
ReLU(),
StatsNet(),
Conv1D(8, kernel_size=5, strides=2, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(beta_initializer='zeros', gamma_initializer=initializers.RandomNormal(mean=1.0, stddev=0.02)),
Conv1D(1, kernel_size=3, strides=1, padding='same', kernel_initializer=initializers.RandomNormal(mean=0.0, stddev=0.02)),
BatchNormalization(beta_initializer='zeros', gamma_initializer=initializers.RandomNormal(mean=1.0, stddev=0.02)),
View(shape=(-1, 8)),
])
def squash(self, tensor, axis):
squared_norm = tf.reduce_sum(tf.square(tensor), axis=axis, keepdims=True)
scale = squared_norm / (1 + squared_norm)
return scale * tensor / tf.sqrt(squared_norm)
def call(self, inputs):
outputs = [capsule(inputs) for capsule in self.capsules]
print("\noutput of primary capsules")
print(outputs)
output = tf.stack(outputs, axis=-1)
# output: [batch_size, data, in_caps]
output = tf.transpose(output, perm=[0, 2, 1])
# output: [batch_size, in_caps, data]
print("\noutput of primary capsule layer")
print(output)
return self.squash(output, axis=-1)
class ClassCaps(Layer):
def __init__(self, num_input_capsules, num_output_capsules, data_in, data_out, num_iterations, dropout_rate=0.05):
super(ClassCaps, self).__init__()
self.num_iterations = num_iterations
self.route_weights = tf.Variable(tf.random.normal(shape=(num_output_capsules, num_input_capsules, data_out, data_in), stddev=0.01))
self.dropout_rate = dropout_rate
def get_config(self):
config = super().get_config().copy()
config.update(
{
"num_iterations": self.num_iterations,
"route_weights": self.route_weights,
"dropout_rate": self.dropout_rate
}
)
return config
def squash(self, tensor, axis):
squared_norm = tf.reduce_sum(tf.square(tensor), axis=axis, keepdims=True)
scale = squared_norm / (1 + squared_norm)
return scale * tensor / tf.sqrt(squared_norm)
def call(self, x, random=True):
# x: [batch_size, in_caps, data]
print("\nx input")
print(x)
if random:
noise = tf.random.normal(self.route_weights.shape, stddev=0.01)
route_weights = self.route_weights + noise
else:
route_weights = self.route_weights
# route_weights: [out_caps, in_caps, data_out, data_in]
print("\nroute_weights")
print(route_weights)
# priors = route_weights[:, None, :, :, :] @ x[None, :, :, :, None]
# # route_weights [out_caps , 1 , in_caps , data_out , data_in]
# # x [ 1 , b , in_caps , data_in , 1 ]
# # priors [out_caps , b , in_caps , data_out, 1 ]
# route: 2 10 4 8, should be 2 1 10 4 8
# x: b 10 8, should be 1 b 10 8 1
print("\nbefore matmul")
print("route_weights times x")
print(route_weights[:, None, :, :, :], x[None, :, :, :, None])
priors = tf.matmul(route_weights[:, None, :, :, :], x[None, :, :, :, None])
# priors: [out_caps, batch_size, in_caps, data_out, 1]
print("\npriors, after matmul")
print(priors)
priors = tf.transpose(priors, perm=[1, 0, 2, 3, 4])
# priors: [batch_size, out_caps, ,in_caps, data_out, 1]
print("\npriors transposed")
print(priors)
if self.dropout_rate > 0.0:
drop = tf.cast(tf.random.uniform(tf.shape(priors)) > self.dropout_rate, dtype=tf.float32)
priors = priors * drop
logits = tf.zeros_like(priors)
# logits: [batch_size, out_caps, in_caps, data_out, 1]
print("\nlogits")
print(logits)
for i in range(self.num_iterations):
probs = tf.nn.softmax(logits, axis=2)
outputs = self.squash(tf.reduce_sum(probs * priors, axis=2, keepdims=True), axis=3)
if i != self.num_iterations - 1:
delta_logits = priors * outputs
logits = logits + delta_logits
print("\noutputs after dynamic routing")
print(outputs)
# outputs: [batch_size, out_caps, 1, data_out, 1]
outputs = tf.squeeze(outputs, [2, 4])
print("\noutputs after squeeze")
print(outputs)
# if len(outputs.shape) == 3:
# outputs = tf.transpose(outputs, perm=[0, 2, 1])
# else:
# outputs = tf.expand_dims(outputs, axis=0)
# outputs = tf.transpose(outputs, perm=[0, 2, 1])
# Do not transpose
if len(outputs.shape) == 3:
pass
else:
outputs = tf.expand_dims(outputs, axis=0)
print("\noutputs of class capsule")
print(outputs)
return outputs
我的模型的 TF 实现使用定制的 VGG16 进行特征提取:
def CapsuleForensics(input_shape, n_class, name="CapsuleForensics"):
tf.keras.backend.clear_session()
# --- Encoder ---
# Input
x = Input(shape=input_shape, name='input')
# Noise enhancement using HPF
conv0 = tf.keras.layers.Conv2D(1, (5,5), strides=(1,1), padding='same', activation=None, kernel_initializer=HighPassFilterInitializer(F0), trainable=False, name='conv0')(x)
# Custom VGG16
# Block 1
vgg16_block1_conv1 = Conv2D(64, (3, 3), weights=new_block1_conv1, activation='relu', padding='same', trainable=False, name='vgg16_block1_conv1', use_bias=True)(conv0)
vgg16_block1_conv2 = Conv2D(64, (3, 3), weights=vgg16_weights['vgg16_block1_conv2'], activation='relu', padding='same', trainable=False, name='vgg16_block1_conv2', use_bias=True)(vgg16_block1_conv1)
vgg16_block1_pool = MaxPooling2D((2, 2), strides=(2, 2), name='vgg16_block1_pool')(vgg16_block1_conv2)
# Block 2
vgg16_block2_conv1 = Conv2D(128, (3, 3), weights=vgg16_weights['vgg16_block2_conv1'], activation='relu', padding='same', trainable=False, name='vgg16_block2_conv1', use_bias=True)(vgg16_block1_pool)
vgg16_block2_conv2 = Conv2D(128, (3, 3), weights=vgg16_weights['vgg16_block2_conv2'], activation='relu', padding='same', trainable=False, name='vgg16_block2_conv2', use_bias=True)(vgg16_block2_conv1)
vgg16_block2_pool = MaxPooling2D((2, 2), strides=(2, 2), name='vgg16_block2_pool')(vgg16_block2_conv2)
# Block 3
vgg16_block3_conv1 = Conv2D(256, (3, 3), weights=vgg16_weights['vgg16_block3_conv1'], activation='relu', padding='same', trainable=False, name='vgg16_block3_conv1', use_bias=True)(vgg16_block2_pool)
vgg16_block3_conv2 = Conv2D(256, (3, 3), weights=vgg16_weights['vgg16_block3_conv2'], activation='relu', padding='same', trainable=False, name='vgg16_block3_conv2', use_bias=True)(vgg16_block3_conv1)
vgg16_block3_conv3 = Conv2D(256, (3, 3), weights=vgg16_weights['vgg16_block3_conv3'], activation='relu', padding='same', trainable=False, name='vgg16_block3_conv3', use_bias=True)(vgg16_block3_conv2)
vgg16_block3_pool = MaxPooling2D((2, 2), strides=(2, 2), name='vgg16_block3_pool')(vgg16_block3_conv3)
primary_caps = PrimaryCaps(num_capsules=10)(vgg16_block3_pool)
class_caps = ClassCaps(num_input_capsules=10, num_output_capsules=n_class, data_in=8, data_out=4, num_iterations=2, dropout_rate=0.05)(primary_caps)
# y = Softmax(axis=-1, name='out')(class_caps)
# Convenience layer to calculate vectors' length (from Capsnet-Keras implementation)/to compute final prediction as probabilities
y = Lambda(lambda x: tf.sqrt(tf.reduce_sum(tf.square(x), axis=-1)), name="y")(class_caps)
model = Model(inputs=[x], outputs=[y], name=name)
return model
非常感谢您的任何意见。非常感谢。
我通常发现很难在如此长的模型定义中搜索错误 - 有一种更简单的方法。
打印两个型号的重量名称和尺寸,亲自看看差异在哪里。
这里有一些粗略的片段
[(weight.name, weight.shape) for layer in model.layers for weight in layer.weights]
[name, tensor for name, tensor in model.state_dict().items()]