当批次需要以不同方式屏蔽时,如何屏蔽变压器模型中可变大小的输入?

问题描述 投票:0回答:3

我正在使用

tensorflow.keras
制作变压器,但在理解
attention_mask
如何适用于
MultiHeadAttention
层时遇到问题。

我的输入是 3 维数据。例如,假设我的整个数据集有 10 个元素,每个元素的长度不超过 4:

# whole data
[
  # first item
  [
    [     1,      2,      3],
    [     1,      2,      3],
    [np.nan, np.nan, np.nan],
    [np.nan, np.nan, np.nan],
  ],
  # second item
  [
    [     1,      2,      3],
    [     5,      8,      2],
    [     3,      7,      8],
    [     4,      6,      2],
  ],
  ... # 8 more items
]

所以,我的面具看起来像:

# assume this is a numpy array
mask = [
  [
    [1, 1, 1],
    [1, 1, 1],
    [0, 0, 0],
    [0, 0, 0],
  ],
  [
    [1, 1, 1],
    [1, 1, 1],
    [1, 1, 1],
    [1, 1, 1],
  ],
  ...
]

所以到目前为止面具的形状是

[10, 4, 3]
。假设我使用
batch_size = 5
。现在,根据文档,
attention_mask
形状应该是
[B, T, S]
(batch_size,query_size,key_size)。在示例中应该是
[5, 4, 4]
?

问题

如果掩模只计算一次,我应该给出哪5项作为掩模?这对我来说听起来违反直觉。我该如何制作面膜?

根据this答案,head_size也应该考虑在内,所以他们也这样做:

mask = mask[:, tf.newaxis, tf.newaxis, :]

我测试过的内容

我唯一一次使用

attention_mask
成功运行变压器的时候是:

mask = np.ones((batch_size, data.shape[1], data.shape[2]))
mask = mask[:, tf.newaxis, tf.newaxis, :]

显然这个面具没有任何意义,因为它都是1,但这只是为了测试它是否具有正确的形状。

型号

我使用几乎与

keras
example 变压器相同的代码进行时间序列分类

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.0, mask=None):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x, attention_mask=mask)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res


def build_model(
    n_classes,
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0.0,
    mlp_dropout=0.0,
    input_mask=None,
) -> keras.Model:
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout, input_mask)

    x = layers.GlobalAveragePooling2D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(n_classes, activation="softmax")(x)
    return keras.Model(inputs, outputs)
python numpy tensorflow keras transformer-model
3个回答
1
投票

首先,一个更简单的例子来理解

MultiHeadAttention
mask。

#Crude Self attention implementation
query = tf.constant([[1], [2], [3], [4]], dtype=tf.float32)  #Shape([4, 1])

scores = tf.matmul(query, query, transpose_b=True) #Shape([4, 4])
#unnormalized, presoftmax score

以上是给定查询的

attention scores
。 当您想要防止注意此乐谱中的某些位置时,可以使用
attention_mask
。所以掩模维度应该与注意力分数维度相同。

假设我们决定上例中的当前标记只需要关注其本身和下一个标记,那么我们可以将掩码定义为:

mask = tf.constant([[1., 1., -np.inf, -np.inf],
        [-np.inf, 1., 1. ,-np.inf],
        [-np.inf, -np.inf, 1., 1.],
        [-np.inf, -np.inf, -np.inf, 1.]])

#apply mask on the score
scores = scores*mask

#softmax 
scores = tf.nn.softmax(scores)

#scores, ( 0 indicates no attention) 
[[0.26894143, 0.73105854, 0.        , 0.        ],
 [0.        , 0.11920292, 0.880797  , 0.        ],
 [0.        , 0.        , 0.04742587, 0.95257413],
 [0.        , 0.        , 0.        , 1.        ]]

#score weighted queries
value = tf.matmul(scores, query)

#value is a weighted average of the current and next token of ( [[1], [2], [3], [4]])
[[1.7310585], #weighted average of ([1], [2]) (current and next)
 [2.8807971],
 [3.9525743],
 [4.       ]]

批次中的每件商品可以有不同的掩模吗?.

是的,我能想到的一个用例是当您对同一批次中的不同样本进行填充时,因此可以将掩码设置为忽略这些填充。

您的具体情况:口罩必须是

(batch_size, 4, 4)
。批次中每个项目的掩码可以相同。

batch_size = 5
query = keras.Input(shape=(4, 3))
mask_tensor = keras.Input(shape=(4, 4))

#keras layer
mha = keras.layers.MultiHeadAttention(num_heads=1, key_dim=3)
output = mha(query=query, value=query, attention_mask=mask_tensor, return_attention_scores=True)

#Create a model
model = keras.Model([query, mask_tensor], output)

#random query and mask. Note the mask needs to be (1:attention or 0:no attention) 
queries = tf.random.normal(shape=(batch_size, 4, 3))
mask_data = tf.random.uniform(maxval=2, shape=(batch_size, 4, 4), dtype=tf.int32)

#calling the model
values, attn_weights = model.predict([queries, mask_data])

#attm_weights.shape
(5, 1, 4, 4)

0
投票

经过一番研究并查看了几个变压器模型示例后,这为我解决了问题。

  1. 创建支持遮罩的自定义
    TransformerBlock
    图层
  2. mask
    call
    方法中添加
    TransformerBlock
    参数并在那里重塑它。
  3. Masking
     之前添加一个 
    TransformerBlock

代码:

class TransformerBlock(layers.Layer):
    def __init__(self, head_size, num_heads, ff_dim, ff_dim2, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size)
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.conv1 = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")
        self.conv2 = layers.Conv1D(filters=ff_dim2, kernel_size=1)
        self.supports_masking = True

    def call(self, inputs, training, mask=None):
        padding_mask = None
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")

        out_norm1 = self.layernorm1(inputs, training=training)
        out_att = self.att(
            out_norm1, out_norm1, training=training, attention_mask=padding_mask
        )
        out_drop1 = self.dropout1(out_att, training=training)
        res = out_drop1 + inputs
        out_norm2 = self.layernorm2(res, training=training)
        out_conv1 = self.conv1(out_norm2, training=training)
        out_drop2 = self.dropout2(out_conv1, training=training)
        out_conv2 = self.conv2(out_drop2, training=training)
        return out_conv2 + res

def build_model(
    n_classes,
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0.0,
    mlp_dropout=0.0,
    mask=None,
) -> keras.Model:
    inputs = keras.Input(shape=input_shape)
    _x = inputs
    if mask is not None:
        _x = layers.Masking(mask_value=mask)(_x)
    for _ in range(num_transformer_blocks):
        _x = TransformerBlock(
            head_size,
            num_heads,
            ff_dim,
            inputs.shape[-1],
            dropout,
        )(_x)

    _x = layers.GlobalAveragePooling2D(data_format="channels_first")(_x)
    for dim in mlp_units:
        _x = layers.Dense(dim, activation="relu")(_x)
        _x = layers.Dropout(mlp_dropout)(_x)
    outputs = layers.Dense(n_classes, activation="softmax")(_x)
    return keras.Model(inputs, outputs)

0
投票

你好,我是人工智能的新手,也正在经历这个使用变压器块进行时间序列分类的示例。

除了填充问题之外,我可以问为什么它在 GlobalAveragePooling2D 层中使用“channels_first”而不是“channels_last”吗?

我有一个像你一样的二维数据,并将其重塑为(批次,高度,宽度,1)。 “channel_first”的准确率高达 9X%,但“channel_last”则不然。

Keras 示例 使用 1D 数据,使用“channel_last”,但也会导致精度较差。

© www.soinside.com 2019 - 2024. All rights reserved.