我有一个带有自定义层的子类
tensorflow.keras.Model
Seq2Seq 模型。但是,当我尝试通过 tensorflow.keras.Model.save()
方法保存它时,它会抛出以下错误:
File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
tid = h5t.py_create(dtype, logical=1)
File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)
Process finished with exit code 1
据我了解,此问题源于 HDF5 格式尝试序列化它无法识别或无法处理的层或配置参数。
我的Tensorflow版本是2.17.0
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional
class StackedBidirectionalLSTMEncoder(Layer):
def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
self.dropout_rate: float = dropout_rate
self.bidirectional_lstm_1: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')
self.bidirectional_lstm_2: Bidirectional = Bidirectional(
LSTM(units, return_sequences=True, return_state=True),
name='bidirectional_lstm_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')
def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
# Embed the input and obtain mask
encoder_output: tf.Tensor = self.embedding(encoder_input)
mask = self.embedding.compute_mask(encoder_input)
# Process through encoder layers
# First LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)
# Second LSTM layer
encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
encoder_output, mask=mask, training=training
)
# Concatenate forward and backward states
state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)
# Apply dropout
encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)
# Final states
final_state_h: tf.Tensor = state_h_2
final_state_c: tf.Tensor = state_c_2
return encoder_output, final_state_h, final_state_c
def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
return self.embedding.compute_mask(inputs, mask)
def get_config(self) -> dict:
config = super(StackedBidirectionalLSTMEncoder, self).get_config()
config.update({
'vocab_size': self.embedding.input_dim,
'encoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
return cls(**config)
import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any
class StackedLSTMDecoder(Layer):
def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
**kwargs) -> None:
super(StackedLSTMDecoder, self).__init__(**kwargs)
self.units: int = units
self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
self.vocab_size: int = vocab_size
self.dropout_rate: float = dropout_rate
# Decoder: 4-layer LSTM without internal Dropout
# Define LSTM and Dropout layers individually
self.lstm_decoder_1: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_1'
)
self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')
self.lstm_decoder_2: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_2'
)
self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')
self.lstm_decoder_3: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_3'
)
self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')
self.lstm_decoder_4: LSTM = LSTM(
units,
return_sequences=True,
return_state=True,
name='lstm_decoder_4'
)
self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')
# Attention Mechanism
self.attention: BahdanauAttention = BahdanauAttention(units=units)
# Output layer
self.dense: Dense = Dense(vocab_size, activation='softmax')
def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
mask: Optional[tf.Tensor] = None) -> tf.Tensor:
# Extract initial state and encoder output from inputs
decoder_input, initial_state, encoder_output = inputs
if decoder_input is None or initial_state is None or encoder_output is None:
raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')
# Embed the input and extract decoder mask
decoder_output: tf.Tensor = self.embedding(decoder_input)
decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)
# Process through decoder layers
# First LSTM layer with initial state
decoder_output, _, _ = self.lstm_decoder_1(
decoder_output,
mask=decoder_mask,
initial_state=initial_state,
training=training
)
decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)
# Second LSTM layer
decoder_output, _, _ = self.lstm_decoder_2(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)
# Third LSTM layer
decoder_output, _, _ = self.lstm_decoder_3(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)
# Fourth LSTM layer
decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
decoder_output,
mask=decoder_mask,
training=training
)
decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)
# Extract only the encoder_mask from the mask list
if mask is not None and isinstance(mask, (list, tuple)):
encoder_mask = mask[1]
else:
encoder_mask = mask
# Apply attention
context_vector, attention_weights = self.attention(
inputs=[encoder_output, decoder_output],
mask=encoder_mask
)
# Concatenate decoder outputs and context vector
concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1) # (batch_size, seq_len_dec, units + units_enc)
# Generate outputs
decoder_output: tf.Tensor = self.dense(concat_output) # (batch_size, seq_len_dec, vocab_size)
return decoder_output
@staticmethod
def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
return None
def get_config(self) -> dict:
config = super(StackedLSTMDecoder, self).get_config()
config.update({
'vocab_size': self.vocab_size,
'decoder_embedding_dim': self.embedding.output_dim,
'units': self.units,
'dropout_rate': self.dropout_rate,
'embedding': tf.keras.layers.serialize(self.embedding),
'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
'dropout_1': tf.keras.layers.serialize(self.dropout_1),
'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
'dropout_2': tf.keras.layers.serialize(self.dropout_2),
'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
'dropout_3': tf.keras.layers.serialize(self.dropout_3),
'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
'dropout_4': tf.keras.layers.serialize(self.dropout_4),
'attention': tf.keras.layers.serialize(self.attention),
'dense': tf.keras.layers.serialize(self.dense),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
# Deserialize layers
config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
config['attention'] = tf.keras.layers.deserialize(config['attention'])
config['dense'] = tf.keras.layers.deserialize(config['dense'])
return cls(**config)
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union
class BahdanauAttention(Layer):
def __init__(self, units: int, **kwargs):
super(BahdanauAttention, self).__init__(**kwargs)
self.units: int = units
self.attention_dense1: Dense = Dense(units, name='attention_dense1')
self.attention_dense2: Dense = Dense(units, name='attention_dense2')
self.attention_v: Dense = Dense(1, name='attention_v')
self.supports_masking: bool = True
def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
# Unpack inputs
encoder_output, decoder_output = inputs
# Attention Mechanism
# Calculate attention scores
# Expand dimensions to match the shapes for broadcasting
encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
1) # Shape: (batch_size, 1, seq_len_encoder, units*2)
decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
2) # Shape: (batch_size, seq_len_decoder, 1, units)
# Compute the attention scores
score: tf.Tensor = tf.nn.tanh(
self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)
# Apply mask if available
if mask is not None:
# If mask is a list or tuple, both encoder and decoder mask have been passed.
# Extract the encoder mask
if isinstance(mask, (list, tuple)):
encoder_mask: tf.Tensor = mask[0]
else:
encoder_mask = mask
if encoder_mask is not None:
# mask shape: (batch_size, seq_len_encoder)
# Expand mask to match score dimensions
encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype) # (batch_size, 1, seq_len_encoder)
encoder_mask = tf.expand_dims(encoder_mask, -1) # (batch_size, 1, seq_len_encoder, 1)
# Add a large negative value to masked positions to nullify their effect after softmax
score += (1.0 - encoder_mask) * -1e9
attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
axis=2) # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)
# Compute the context vector
context_vector: tf.Tensor = attention_weights * encoder_output_expanded # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2) # Shape: (batch_size, seq_len_decoder, units*2)
return context_vector, attention_weights
@staticmethod
def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
# This layer does not propagate the mask further
return None
def get_config(self) -> dict:
config = super(BahdanauAttention, self).get_config()
config.update({
'units': self.units,
'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
'attention_v': tf.keras.layers.serialize(self.attention_v),
})
return config
@classmethod
def from_config(cls, config: dict) -> 'BahdanauAttention':
# Deserialize layers
config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
return cls(**config)
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple
class RetrosynthesisSeq2SeqModel(Model):
def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)
# Save the number of units (neurons)
self.units: int = units
# Encoder layer
self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
input_vocab_size, encoder_embedding_dim, units, dropout_rate
)
# Decoder layer
self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
output_vocab_size, decoder_embedding_dim, units, dropout_rate
)
# Save the vocabulary sizes
self.input_vocab_size: int = input_vocab_size
self.output_vocab_size: int = output_vocab_size
# Mapping encoder final states to decoder initial states
self.enc_state_h: Dense = Dense(units, name='enc_state_h')
self.enc_state_c: Dense = Dense(units, name='enc_state_c')
# Store the data processors (to be set externally)
self.encoder_data_processor: Optional[Any] = None
self.decoder_data_processor: Optional[Any] = None
# Save the dropout rate
self.dropout_rate: float = dropout_rate
def build(self, input_shape):
# Define the input shapes for encoder and decoder
encoder_input_shape, decoder_input_shape = input_shape
# Pass a dummy input through encoder and decoder to initialize weights
encoder_dummy = tf.zeros(encoder_input_shape)
decoder_dummy = tf.zeros(decoder_input_shape)
# Forward pass to build the model
self.call((encoder_dummy, decoder_dummy), training=False)
# Mark the model as built
super(RetrosynthesisSeq2SeqModel, self).build(input_shape)
def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
"""
Forward pass of the Seq2Seq model.
Args:
inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
training (Optional[bool], optional): Training flag. Defaults to None.
Returns:
tf.Tensor: The output predictions from the decoder.
"""
# Extract encoder and decoder inputs
encoder_input, decoder_input = inputs
# Encoder
encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)
# Map encoder final states to decoder initial states
decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h) # (batch_size, units)
decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c) # (batch_size, units)
decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)
# Prepare decoder inputs as a tuple
decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
decoder_input,
decoder_initial_state,
encoder_output
)
# Extract encoder mask
encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)
# Decoder
output: tf.Tensor = self.decoder.call(
decoder_inputs,
training=training,
mask=encoder_mask
)
return output
def get_config(self) -> dict:
config = super(RetrosynthesisSeq2SeqModel, self).get_config()
config.update({
'units': self.units,
'input_vocab_size': self.input_vocab_size,
'output_vocab_size': self.output_vocab_size,
'encoder_embedding_dim': self.encoder.embedding.output_dim,
'decoder_embedding_dim': self.decoder.embedding.output_dim,
'dropout_rate': self.dropout_rate,
'encoder': tf.keras.layers.serialize(self.encoder),
'decoder': tf.keras.layers.serialize(self.decoder),
'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
})
return config
@classmethod
def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
# Deserialize layers
config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
return cls(**config)
#!/usr/bin/env python3
import numpy as np
from tensorflow.keras.optimizers import Adam
input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2
model = RetrosynthesisSeq2SeqModel(
input_vocab_size=input_vocab_size,
output_vocab_size=output_vocab_size,
encoder_embedding_dim=encoder_embedding_dim,
decoder_embedding_dim=decoder_embedding_dim,
units=units,
dropout_rate=dropout_rate
)
encoder_input_shape = (1, 20) # (batch_size, sequence_length)
decoder_input_shape = (1, 20) # (batch_size, sequence_length)
model.build([encoder_input_shape, decoder_input_shape])
sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))
learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)
model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
我也遇到了这个问题,但我没有使用自定义图层。对我来说解决这个问题的是创建一个新的 conda 环境并降级到 TensorFlow 2.12.0 和 h5py 3.6.0。
conda create -n tf_env python=3.10
conda activate tf_env
conda install h5py=3.6.0
conda install tensorflow[and-cuda]
你也可以指定TensorFlow版本,但我的默认下载2.12.0。
希望这有帮助!