对于我的项目,我决定使用具有三元组损失的 CNN 进行特征嵌入。我已经预处理我的数据以创建 72xframes 窗口,我已经定义了一些函数,这些函数可以让我批量获取锚点、正面和负面图像,我现在正在尝试构建 CNN,它从我的数据中学习嵌入向量并根据三重损失。到目前为止,我的代码片段如下:
def get_batches(windows_path = windows_path, beats_path = beats_path, batch_size=batch_size):
windows_list = os.listdir(windows_path) #those are all songs and their npy windows
batch_files = random.choices(windows_list, k=batch_size)
windows_batch = [os.path.join(folder_path, f) for f in batch_files]
beats_batch = [os.path.join(beats_path, f) for f in batch_files]
return windows_batch, beats_batch, batch_files
#get_batch(windows_batch, beats_batch, batch_size, R, frames, delta_pos, delta_negMax, delta_negMin) -> returns the windows, anchors, positives and negatives for my batch
def get_triples_batch(windows_path, batch_files, beats_path, R, frames, delta_pos, delta_negMax, delta_negMin):
triples = []
for file in batch_files:
b = np.load(os.path.join(beats_path, file))
anchor, anchor_index = anchor_beat(b, R, frames)
positive, positive_index = positive_beat(b, anchor, delta_pos, R, frames)
negative, negative_index = negative_beat(b, anchor, delta_negMax, delta_negMin, R, frames)
windows_file = np.load(os.path.join(windows_path, file))
anchor_window = np.zeros((windows_file.shape[0], frames))
positive_window = np.zeros((windows_file.shape[0], frames))
negative_window = np.zeros((windows_file.shape[0], frames))
for frame in range(frames):
anchor_window[:,frame] = windows_file[:,anchor_index - (frames-1)//2 + frame]
positive_window[:,frame] = windows_file[:,positive_index - (frames-1)//2 + frame]
negative_window[:,frame] = windows_file[:,negative_index - (frames-1)//2 + frame]
triples.append([anchor_window, positive_window, negative_window])
return triples
我是这样批量创建的,所以NN的输入是一个30行3列的数组,每一项都是一个矩阵
到目前为止我的模型规格如下:
def get_embedding_module(image_array):
# construct the input layer and pass the inputs through a
# pre-processing layer
inputs = keras.Input(shape=(batch_size,72,frames))
x = keras.layers.Conv2D(64, (1,1), activation='relu', input_shape=(batch_size,72,frames))(inputs)
x = keras.layers.Conv2D(128, (1,1), activation='relu')(x)
x = keras.layers.MaxPooling2D(pool_size=(3, 4))(x)
x = keras.layers.Conv2D(256, (1,1), activation='relu')(x)
x = keras.layers.MaxPooling2D(pool_size=(2, 4))(x)
x = keras.layers.Flatten()(x)
#Embedding layer
x = keras.layers.Dense(128, activation='relu')(x)
x = keras.layers.Dense(128, activation='linear')(x)
#L2 Normalization layer if necessary
outputs = keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(x)
# build the embedding model and return it
embedding = keras.Model(inputs, outputs, name="embedding")(x)
return embedding
def get_siamese_network(imageSize, embeddingModel):
# build the anchor, positive and negative input layer
anchorInput = keras.Input(name="anchor", shape=imageSize)
positiveInput = keras.Input(name="positive", shape=imageSize)
negativeInput = keras.Input(name="negative", shape=imageSize)
# embed the anchor, positive and negative images
anchorEmbedding = embeddingModel(anchorInput)
positiveEmbedding = embeddingModel(positiveInput)
negativeEmbedding = embeddingModel(negativeInput)
# build the siamese network and return it
siamese_network = keras.Model(
inputs=[anchorInput, positiveInput, negativeInput],
outputs=[anchorEmbedding, positiveEmbedding, negativeEmbedding]
)
return siamese_network
class SiameseModel(keras.Model):
def __init__(self, siameseNetwork, margin, lossTracker):
super().__init__()
self.siameseNetwork = siameseNetwork
self.margin = margin
self.lossTracker = lossTracker
def _compute_distance(self, inputs):
(anchor, positive, negative) = inputs
embeddings = self.siameseNetwork((anchor, positive, negative))
anchorEmbedding = embeddings[0]
positiveEmbedding = embeddings[1]
negativeEmbedding = embeddings[2]
apDistance = tf.reduce_sum(tf.square(anchorEmbedding - positiveEmbedding), axis=-1)
anDistance = tf.reduce_sum(tf.square(anchorEmbedding - negativeEmbedding), axis=-1)
return (apDistance, anDistance)
def _compute_loss(self, apDistance, anDistance):
loss = apDistance - anDistance
loss = tf.maximum(loss + self.margin, 0.0)
return loss
def call(self, inputs):
(apDistance, anDistance) = self._compute_distance(inputs)
return (apDistance, anDistance)
def train_step(self, inputs):
with tf.GradientTape() as tape:
(apDistance, anDistance) = self._compute_distance(inputs)
loss = self._compute_loss(apDistance, anDistance)
gradients = tape.gradient(loss, self.siameseNetwork.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.siameseNetwork.trainable_variables) )
# update the metrics and return the loss
self.lossTracker.update_state(loss)
return {"loss": self.lossTracker.result()}
def test_step(self, inputs):
(apDistance, anDistance) = self._compute_distance(inputs)
loss = self._compute_loss(apDistance, anDistance)
self.lossTracker.update_state(loss)
return {"loss": self.lossTracker.result()}
@property
def metrics(self):
return [self.lossTracker]
class SiameseModel(Model):
def __init__(self, siamese_network, margin=0.5):
super().__init__()
self.siamese_network = siamese_network
self.margin = margin
self.loss_tracker = metrics.Mean(name="loss")
def call(self, inputs):
return self.siamese_network(inputs)
def train_step(self, data):
with tf.GradientTape() as tape:
loss = self._compute_loss(data)
gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
self.optimizer.apply_gradients(
zip(gradients, self.siamese_network.trainable_weights)
)
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def test_step(self, data):
loss = self._compute_loss(data)
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
def _compute_loss(self, data):
ap_distance, an_distance = self.siamese_network(data)
loss = ap_distance - an_distance
loss = tf.maximum(loss + self.margin, 0.0)
return loss
@property
def metrics(self):
return [self.loss_tracker]
为了使此功能正常运行,我有一些问题:
同样为了清楚起见,有一些距离损失的 CNN 和孪生网络有什么区别?我读到 Siamese Network 架构由两个相同的神经网络组成,每个神经网络获取一个输入样本,并生成一个固定长度的输出向量,它表示输入样本的嵌入或特征。但是对于 triplet loss,你不需要 3 吗?那么我对 Siamese Networks 的处理方式是否正确?代码片段将非常有帮助!