将数据输入孪生网络

问题描述 投票:0回答:0

对于我的项目,我决定使用具有三元组损失的 CNN 进行特征嵌入。我已经预处理我的数据以创建 72xframes 窗口,我已经定义了一些函数,这些函数可以让我批量获取锚点、正面和负面图像,我现在正在尝试构建 CNN,它从我的数据中学习嵌入向量并根据三重损失。到目前为止,我的代码片段如下:

def get_batches(windows_path = windows_path, beats_path = beats_path, batch_size=batch_size):
    windows_list = os.listdir(windows_path) #those are all songs and their npy windows
    batch_files = random.choices(windows_list, k=batch_size)
    windows_batch = [os.path.join(folder_path, f) for f in batch_files]
    beats_batch = [os.path.join(beats_path, f) for f in batch_files]
    return windows_batch, beats_batch, batch_files

#get_batch(windows_batch, beats_batch, batch_size, R, frames, delta_pos, delta_negMax, delta_negMin) -> returns the windows, anchors, positives and negatives for my batch
def get_triples_batch(windows_path, batch_files, beats_path, R, frames, delta_pos, delta_negMax, delta_negMin):
    triples = []
    for file in batch_files:
        
        b = np.load(os.path.join(beats_path, file))
    
        anchor, anchor_index = anchor_beat(b, R, frames)
        positive, positive_index = positive_beat(b, anchor, delta_pos, R, frames)
        negative, negative_index = negative_beat(b, anchor, delta_negMax, delta_negMin, R, frames)
        
        windows_file = np.load(os.path.join(windows_path, file))
    
        anchor_window =  np.zeros((windows_file.shape[0], frames))
        positive_window = np.zeros((windows_file.shape[0], frames))
        negative_window = np.zeros((windows_file.shape[0], frames))
        for frame in range(frames):
            anchor_window[:,frame] = windows_file[:,anchor_index - (frames-1)//2 + frame]
            positive_window[:,frame] = windows_file[:,positive_index - (frames-1)//2 + frame]
            negative_window[:,frame] = windows_file[:,negative_index - (frames-1)//2 + frame]
        triples.append([anchor_window, positive_window, negative_window]) 
    
    return triples 

我是这样批量创建的,所以NN的输入是一个30行3列的数组,每一项都是一个矩阵

到目前为止我的模型规格如下:

def get_embedding_module(image_array):
    # construct the input layer and pass the inputs through a
    # pre-processing layer
    inputs = keras.Input(shape=(batch_size,72,frames))
    x = keras.layers.Conv2D(64, (1,1), activation='relu', input_shape=(batch_size,72,frames))(inputs)
    x = keras.layers.Conv2D(128, (1,1), activation='relu')(x)
    x = keras.layers.MaxPooling2D(pool_size=(3, 4))(x)
    x = keras.layers.Conv2D(256, (1,1), activation='relu')(x)
    x = keras.layers.MaxPooling2D(pool_size=(2, 4))(x)
    x = keras.layers.Flatten()(x)
    #Embedding layer
    x = keras.layers.Dense(128, activation='relu')(x)
    x = keras.layers.Dense(128, activation='linear')(x)  
    #L2 Normalization layer if necessary
    outputs = keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(x)
    
    # build the embedding model and return it
    embedding = keras.Model(inputs, outputs, name="embedding")(x)
    return embedding

def get_siamese_network(imageSize, embeddingModel):
    # build the anchor, positive and negative input layer
    anchorInput = keras.Input(name="anchor", shape=imageSize)
    positiveInput = keras.Input(name="positive", shape=imageSize)
    negativeInput = keras.Input(name="negative", shape=imageSize)
    # embed the anchor, positive and negative images
    anchorEmbedding = embeddingModel(anchorInput)
    positiveEmbedding = embeddingModel(positiveInput)
    negativeEmbedding = embeddingModel(negativeInput)
    # build the siamese network and return it
    siamese_network = keras.Model(
        inputs=[anchorInput, positiveInput, negativeInput],
        outputs=[anchorEmbedding, positiveEmbedding, negativeEmbedding]
    )
    return siamese_network

class SiameseModel(keras.Model):
    def __init__(self, siameseNetwork, margin, lossTracker):
        super().__init__()
        self.siameseNetwork = siameseNetwork
        self.margin = margin
        self.lossTracker = lossTracker
    def _compute_distance(self, inputs):
        (anchor, positive, negative) = inputs

        embeddings = self.siameseNetwork((anchor, positive, negative))
        anchorEmbedding = embeddings[0]
        positiveEmbedding = embeddings[1]
        negativeEmbedding = embeddings[2]

        apDistance = tf.reduce_sum(tf.square(anchorEmbedding - positiveEmbedding), axis=-1)
        anDistance = tf.reduce_sum(tf.square(anchorEmbedding - negativeEmbedding), axis=-1)
        
        return (apDistance, anDistance)
        
    def _compute_loss(self, apDistance, anDistance):
        loss = apDistance - anDistance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    def call(self, inputs):
        (apDistance, anDistance) = self._compute_distance(inputs)
        return (apDistance, anDistance)

    def train_step(self, inputs):
        with tf.GradientTape() as tape:
            (apDistance, anDistance) = self._compute_distance(inputs)
            loss = self._compute_loss(apDistance, anDistance)

        gradients = tape.gradient(loss, self.siameseNetwork.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.siameseNetwork.trainable_variables)        )
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}

    def test_step(self, inputs):
        (apDistance, anDistance) = self._compute_distance(inputs)
        loss = self._compute_loss(apDistance, anDistance)
        
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
        
    @property
    def metrics(self):
        return [self.lossTracker]

class SiameseModel(Model):
    

    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
   
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_network(data)

        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        return [self.loss_tracker]

为了使此功能正常运行,我有一些问题:

  1. 编写一个定义固定迭代次数的 for 循环(在其中训练模型)是否明智,因为我从多个数据中对我的批次进行采样?如果是的话,这样的会是什么样子?
  2. 我实际上如何将数据输入模型?我在这里修改了主要在网络上找到的代码,所以我也不确定它是否适用于我的数据的这个实现?

同样为了清楚起见,有一些距离损失的 CNN 和孪生网络有什么区别?我读到 Siamese Network 架构由两个相同的神经网络组成,每个神经网络获取一个输入样本,并生成一个固定长度的输出向量,它表示输入样本的嵌入或特征。但是对于 triplet loss,你不需要 3 吗?那么我对 Siamese Networks 的处理方式是否正确?代码片段将非常有帮助!

machine-learning deep-learning conv-neural-network siamese-network triplet
© www.soinside.com 2019 - 2024. All rights reserved.