Keras 模型中的自定义编码器和解码器层显示为未构建

问题描述 投票:0回答:1

我有一个带有自定义层的子类

tensorflow.keras.Model
Seq2Seq 模型。但是,当我尝试运行测试脚本来构建和编译模型时,运行
model.summary()
给出:

Model: "retrosynthesis_seq2_seq_model"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Layer (type)                    ┃ Output Shape           ┃       Param # ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ simple_encoder (SimpleEncoder)  │ ?                      │   0 (unbuilt) │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ simple_decoder (SimpleDecoder)  │ ?                      │   0 (unbuilt) │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ enc_state_h (Dense)             │ (1, 128)               │        16,512 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ enc_state_c (Dense)             │ (1, 128)               │        16,512 │
└─────────────────────────────────┴────────────────────────┴───────────────┘
 Total params: 361,064 (1.38 MB)
 Trainable params: 361,064 (1.38 MB)
 Non-trainable params: 0 (0.00 B)
Model output shape: (1, 20, 1000)

据我所知,我已经正确实现了编码器和解码器层的

build()
方法。我认为当我尝试保存模型时,这会导致
TypeError: Unsupported integer size (0)
序列化错误。

我在下面包含了编码器、解码器和 Seq2Seq 模型类,以及用于复制的测试脚本。我很欣赏它的大量代码,但是将其粘贴到单个文件中并运行足以复制错误:

import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Layer, Embedding, Bidirectional, LSTM, Dropout
from tensorflow.keras.optimizers import Adam
from typing import Optional, Tuple, Any

"""
Encoder Layer
"""
class SimpleEncoder(Layer):
    def __init__(self, vocab_size: int, embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
        super(SimpleEncoder, self).__init__(**kwargs)
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.units = units
        self.dropout_rate = dropout_rate

        self.embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True, name='simple_embedding')
        self.dense = Dense(units, activation='relu', name='simple_dense')
        self.dropout = Dropout(dropout_rate, name='simple_dropout')

    def build(self, input_shape):
        self.embedding.build(input_shape)

        embedding_output_shape = self.embedding.compute_output_shape(input_shape)
        self.dense.build(embedding_output_shape)

        dense_output_shape = self.dense.compute_output_shape(embedding_output_shape)
        self.dropout.build(dense_output_shape)

        super(SimpleEncoder, self).build(input_shape)

    def call(self, inputs: tf.Tensor, training: Optional[bool] = None) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
        x = self.embedding(inputs)  # Shape: (batch_size, sequence_length, embedding_dim)

        encoder_output = self.dense(x)  # Shape: (batch_size, sequence_length, units)

        encoder_output = self.dropout(encoder_output, training=training)

        state_h = tf.zeros_like(encoder_output[:, 0, :])  # Shape: (batch_size, units)
        state_c = tf.zeros_like(encoder_output[:, 0, :])  # Shape: (batch_size, units)

        return encoder_output, state_h, state_c

    def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
        return self.embedding.compute_mask(inputs, mask)

    def get_config(self) -> dict:
        config = super(SimpleEncoder, self).get_config()
        config.update({
            'vocab_size': self.vocab_size,
            'embedding_dim': self.embedding_dim,
            'units': self.units,
            'dropout_rate': self.dropout_rate,
            'embedding': tf.keras.layers.serialize(self.embedding),
            'dense': tf.keras.layers.serialize(self.dense),
            'dropout': tf.keras.layers.serialize(self.dropout),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'SimpleEncoder':
        config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
        config['dense'] = tf.keras.layers.deserialize(config['dense'])
        config['dropout'] = tf.keras.layers.deserialize(config['dropout'])
        return cls(**config)

"""
Decoder Layer
"""
class SimpleDecoder(Layer):
    def __init__(
        self,
        vocab_size: int,
        embedding_dim: int,
        units: int,
        dropout_rate: float = 0.2,
        **kwargs
    ):
        super(SimpleDecoder, self).__init__(**kwargs)
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.units = units
        self.dropout_rate = dropout_rate

        self.embedding = Embedding(
            input_dim=vocab_size,
            output_dim=embedding_dim,
            mask_zero=True,
            name='decoder_embedding'
        )
        self.lstm = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='decoder_lstm'
        )
        self.dropout = Dropout(dropout_rate, name='decoder_dropout')
        self.dense = Dense(vocab_size, activation='softmax', name='decoder_dense')

    def build(self, input_shape):
        decoder_input_shape, initial_states_shape = input_shape

        self.embedding.build(decoder_input_shape)

        embedding_output_shape = self.embedding.compute_output_shape(decoder_input_shape)
        self.lstm.build(embedding_output_shape)

        lstm_output_shape = self.lstm.compute_output_shape(embedding_output_shape)
        self.dropout.build(lstm_output_shape)

        dropout_output_shape = self.dropout.compute_output_shape(lstm_output_shape)
        self.dense.build(dropout_output_shape)

        super(SimpleDecoder, self).build(input_shape)

    def call(
        self,
        inputs: Tuple[tf.Tensor, tuple[tf.Tensor, tf.Tensor]],
        training: Optional[bool] = None,
        mask: Optional[tf.Tensor] = None
    ) -> tf.Tensor:
        decoder_input, initial_state = inputs

        if decoder_input is None or initial_state is None:
            raise ValueError('decoder_input and initial_state must be provided to the Decoder.')

        x = self.embedding(decoder_input)

        lstm_output, state_h, state_c = self.lstm(
            x,
            initial_state=initial_state,
            training=training,
            mask=None
        )

        lstm_output = self.dropout(lstm_output, training=training)

        output = self.dense(lstm_output)

        return output

    @staticmethod
    def compute_mask(inputs: Tuple, mask: Optional[tf.Tensor] = None) -> None:
        return None

    def get_config(self) -> dict:
        config = super(SimpleDecoder, self).get_config()
        config.update({
            'vocab_size': self.vocab_size,
            'embedding_dim': self.embedding_dim,
            'units': self.units,
            'dropout_rate': self.dropout_rate,
            'embedding': tf.keras.layers.serialize(self.embedding),
            'lstm': tf.keras.layers.serialize(self.lstm),
            'dropout': tf.keras.layers.serialize(self.dropout),
            'dense': tf.keras.layers.serialize(self.dense),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'SimpleDecoder':
        config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
        config['lstm'] = tf.keras.layers.deserialize(config['lstm'])
        config['dropout'] = tf.keras.layers.deserialize(config['dropout'])
        config['dense'] = tf.keras.layers.deserialize(config['dense'])
        return cls(**config)

"""
Seq2Seq Model
"""
class RetrosynthesisSeq2SeqModel(Model):
    def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
                 decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
        super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)

        self.units: int = units

        self.encoder: SimpleEncoder = SimpleEncoder(
            input_vocab_size, encoder_embedding_dim, units, dropout_rate
        )

        self.decoder: SimpleDecoder = SimpleDecoder(
            output_vocab_size, decoder_embedding_dim, units, dropout_rate
        )

        self.input_vocab_size: int = input_vocab_size
        self.output_vocab_size: int = output_vocab_size

        self.enc_state_h: Dense = Dense(units, name='enc_state_h')
        self.enc_state_c: Dense = Dense(units, name='enc_state_c')

        self.encoder_data_processor: Optional[Any] = None
        self.decoder_data_processor: Optional[Any] = None

        self.dropout_rate: float = dropout_rate

    def build(self, input_shape):
        encoder_input_shape, decoder_input_shape = input_shape

        encoder_dummy = tf.zeros(encoder_input_shape)
        decoder_dummy = tf.zeros(decoder_input_shape)

        self.call((encoder_dummy, decoder_dummy), training=False)

        super(RetrosynthesisSeq2SeqModel, self).build(input_shape)

    def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
        encoder_input, decoder_input = inputs

        encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)

        decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h)
        decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c)
        decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)

        decoder_inputs = (
            decoder_input,
            decoder_initial_state
        )

        encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)

        output: tf.Tensor = self.decoder.call(
            decoder_inputs,
            training=training,
            mask=encoder_mask
        )

        return output

    def get_config(self) -> dict:
        config = super(RetrosynthesisSeq2SeqModel, self).get_config()
        config.update({
            'units': self.units,
            'input_vocab_size': self.input_vocab_size,
            'output_vocab_size': self.output_vocab_size,
            'encoder_embedding_dim': self.encoder.embedding.output_dim,
            'decoder_embedding_dim': self.decoder.embedding.output_dim,
            'dropout_rate': self.dropout_rate,
            'encoder': tf.keras.layers.serialize(self.encoder),
            'decoder': tf.keras.layers.serialize(self.decoder),
            'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
            'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
        config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
        config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
        config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
        config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
        return cls(**config)

"""
Test Script
"""
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2

model = RetrosynthesisSeq2SeqModel(
    input_vocab_size=input_vocab_size,
    output_vocab_size=output_vocab_size,
    encoder_embedding_dim=encoder_embedding_dim,
    decoder_embedding_dim=decoder_embedding_dim,
    units=units,
    dropout_rate=dropout_rate
)

encoder_seq_length = 20
decoder_seq_length = 20
model.build(input_shape=[(1, encoder_seq_length), (1, decoder_seq_length)])

sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))

learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

model.summary()

output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)

model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
python tensorflow machine-learning keras deep-learning
1个回答
0
投票

由于我直接调用

call()
方法,编码器和解码器未构建:

encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
output: tf.Tensor = self.decoder(
            decoder_inputs,
            training=training,
            mask=encoder_mask
        )

更改此设置以直接调用层后,编码器和解码器层显示为内置

model.summary()
输出:

encoder_output, state_h, state_c = self.encoder(encoder_input, training=training)
output: tf.Tensor = self.decoder(
            decoder_inputs,
            training=training,
            mask=encoder_mask
        )

据我所知,调用该层的

call()
方法直接绕过了Keras用于跟踪和构建层的内部机制,导致它们无法正确构建和跟踪。

© www.soinside.com 2019 - 2024. All rights reserved.