我有一个带有自定义层的子类
tensorflow.keras.Model
Seq2Seq 模型。但是,当我尝试运行测试脚本来构建和编译模型时,运行 model.summary()
给出:
Model: "retrosynthesis_seq2_seq_model"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Layer (type) ┃ Output Shape ┃ Param # ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ simple_encoder (SimpleEncoder) │ ? │ 0 (unbuilt) │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ simple_decoder (SimpleDecoder) │ ? │ 0 (unbuilt) │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ enc_state_h (Dense) │ (1, 128) │ 16,512 │
├─────────────────────────────────┼────────────────────────┼───────────────┤
│ enc_state_c (Dense) │ (1, 128) │ 16,512 │
└─────────────────────────────────┴────────────────────────┴───────────────┘
Total params: 361,064 (1.38 MB)
Trainable params: 361,064 (1.38 MB)
Non-trainable params: 0 (0.00 B)
Model output shape: (1, 20, 1000)
据我所知,我已经正确实现了编码器和解码器层的
build()
方法。我认为当我尝试保存模型时,这会导致 TypeError: Unsupported integer size (0)
序列化错误。
我在下面包含了编码器、解码器和 Seq2Seq 模型类,以及用于复制的测试脚本。我很欣赏它的大量代码,但是将其粘贴到单个文件中并运行足以复制错误:
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Layer, Embedding, Bidirectional, LSTM, Dropout
from tensorflow.keras.optimizers import Adam
from typing import Optional, Tuple, Any
"""
Encoder Layer
"""
class SimpleEncoder(Layer):
def __init__(self, vocab_size: int, embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(SimpleEncoder, self).__init__(**kwargs)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.units = units
self.dropout_rate = dropout_rate
self.embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True, name='simple_embedding')
self.dense = Dense(units, activation='relu', name='simple_dense')
self.dropout = Dropout(dropout_rate, name='simple_dropout')
def build(self, input_shape):
self.embedding.build(input_shape)
embedding_output_shape = self.embedding.compute_output_shape(input_shape)
self.dense.build(embedding_output_shape)
dense_output_shape = self.dense.compute_output_shape(embedding_output_shape)
self.dropout.build(dense_output_shape)
super(SimpleEncoder, self).build(input_shape)
def call(self, inputs: tf.Tensor, training: Optional[bool] = None) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
x = self.embedding(inputs) # Shape: (batch_size, sequence_length, embedding_dim)
encoder_output = self.dense(x) # Shape: (batch_size, sequence_length, units)
encoder_output = self.dropout(encoder_output, training=training)
state_h = tf.zeros_like(encoder_output[:, 0, :]) # Shape: (batch_size, units)
state_c = tf.zeros_like(encoder_output[:, 0, :]) # Shape: (batch_size, units)
return encoder_output, state_h, state_c
def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)
def get_config(self) -> dict:
config = super(SimpleEncoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'embedding_dim': self.embedding_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'dense': tf.keras.layers.serialize(self.dense),
'dropout': tf.keras.layers.serialize(self.dropout),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'SimpleEncoder':
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
config['dropout'] = tf.keras.layers.deserialize(config['dropout'])
return cls(**config)
"""
Decoder Layer
"""
class SimpleDecoder(Layer):
def __init__(
self,
vocab_size: int,
embedding_dim: int,
units: int,
dropout_rate: float = 0.2,
**kwargs
):
super(SimpleDecoder, self).__init__(**kwargs)
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.units = units
self.dropout_rate = dropout_rate
self.embedding = Embedding(
input_dim=vocab_size,
output_dim=embedding_dim,
mask_zero=True,
name='decoder_embedding'
)
self.lstm = LSTM(
units,
return_sequences=True,
return_state=True,
name='decoder_lstm'
)
self.dropout = Dropout(dropout_rate, name='decoder_dropout')
self.dense = Dense(vocab_size, activation='softmax', name='decoder_dense')
def build(self, input_shape):
decoder_input_shape, initial_states_shape = input_shape
self.embedding.build(decoder_input_shape)
embedding_output_shape = self.embedding.compute_output_shape(decoder_input_shape)
self.lstm.build(embedding_output_shape)
lstm_output_shape = self.lstm.compute_output_shape(embedding_output_shape)
self.dropout.build(lstm_output_shape)
dropout_output_shape = self.dropout.compute_output_shape(lstm_output_shape)
self.dense.build(dropout_output_shape)
super(SimpleDecoder, self).build(input_shape)
def call(
self,
inputs: Tuple[tf.Tensor, tuple[tf.Tensor, tf.Tensor]],
training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None
) -> tf.Tensor:
decoder_input, initial_state = inputs
if decoder_input is None or initial_state is None:
raise ValueError('decoder_input and initial_state must be provided to the Decoder.')
x = self.embedding(decoder_input)
lstm_output, state_h, state_c = self.lstm(
x,
initial_state=initial_state,
training=training,
mask=None
)
lstm_output = self.dropout(lstm_output, training=training)
output = self.dense(lstm_output)
return output
@staticmethod
def compute_mask(inputs: Tuple, mask: Optional[tf.Tensor] = None) -> None:
return None
def get_config(self) -> dict:
config = super(SimpleDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'embedding_dim': self.embedding_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm': tf.keras.layers.serialize(self.lstm),
'dropout': tf.keras.layers.serialize(self.dropout),
'dense': tf.keras.layers.serialize(self.dense),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'SimpleDecoder':
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm'] = tf.keras.layers.deserialize(config['lstm'])
config['dropout'] = tf.keras.layers.deserialize(config['dropout'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)
"""
Seq2Seq Model
"""
class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)
self.units: int = units
self.encoder: SimpleEncoder = SimpleEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)
self.decoder: SimpleDecoder = SimpleDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None
self.dropout_rate: float = dropout_rate
def build(self, input_shape):
encoder_input_shape, decoder_input_shape = input_shape
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)
self.call((encoder_dummy, decoder_dummy), training=False)
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)
def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
encoder_input, decoder_input = inputs
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)
decoder_inputs = (
decoder_input,
decoder_initial_state
)
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)
return output
def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config
@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)
"""
Test Script
"""
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2
model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)
encoder_seq_length = 20
decoder_seq_length = 20
model.build(input_shape=[(1, encoder_seq_length), (1, decoder_seq_length)])
sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))
learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()
output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)
model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
由于我直接调用
call()
方法,编码器和解码器未构建:
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
output: tf.Tensor = self.decoder(
decoder_inputs,
training=training,
mask=encoder_mask
)
更改此设置以直接调用层后,编码器和解码器层显示为内置
model.summary()
输出:
encoder_output, state_h, state_c = self.encoder(encoder_input, training=training)
output: tf.Tensor = self.decoder(
decoder_inputs,
training=training,
mask=encoder_mask
)
据我所知,调用该层的
call()
方法直接绕过了Keras用于跟踪和构建层的内部机制,导致它们无法正确构建和跟踪。