`尝试保存自定义 Keras 模型时出现类型错误:不支持的整数大小 (0)`

问题描述 投票:0回答:1

我有一个带有自定义层的子类

tensorflow.keras.Model
Seq2Seq 模型。但是,当我尝试通过
tensorflow.keras.Model.save()
方法保存它时,它会抛出以下错误:

  File "/home/anaconda3/envs/aizynth-env/lib/python3.10/site-packages/h5py/_hl/dataset.py", line 86, in make_new_dset
    tid = h5t.py_create(dtype, logical=1)
  File "h5py/h5t.pyx", line 1663, in h5py.h5t.py_create
  File "h5py/h5t.pyx", line 1687, in h5py.h5t.py_create
  File "h5py/h5t.pyx", line 1705, in h5py.h5t.py_create
  File "h5py/h5t.pyx", line 1459, in h5py.h5t._c_int
TypeError: Unsupported integer size (0)

Process finished with exit code 1

据我了解,此问题源于 HDF5 格式尝试序列化它无法识别或无法处理的层或配置参数。

我的Tensorflow版本是2.17.0

最小可重现示例代码

编码器层

import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, Bidirectional, LSTM, Dropout
from typing import Tuple, Optional

class StackedBidirectionalLSTMEncoder(Layer):
    def __init__(self, vocab_size: int, encoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, **kwargs):
        super(StackedBidirectionalLSTMEncoder, self).__init__(**kwargs)
        self.units: int = units
        self.embedding: Embedding = Embedding(vocab_size, encoder_embedding_dim, mask_zero=True)
        self.dropout_rate: float = dropout_rate

        self.bidirectional_lstm_1: Bidirectional = Bidirectional(
            LSTM(units, return_sequences=True, return_state=True),
            name='bidirectional_lstm_1'
        )

        self.dropout_1: Dropout = Dropout(dropout_rate, name='encoder_dropout_1')

        self.bidirectional_lstm_2: Bidirectional = Bidirectional(
            LSTM(units, return_sequences=True, return_state=True),
            name='bidirectional_lstm_2'
        )

        self.dropout_2: Dropout = Dropout(dropout_rate, name='encoder_dropout_2')

    def call(self, encoder_input: tf.Tensor, training: Optional[bool] = None):
        # Embed the input and obtain mask
        encoder_output: tf.Tensor = self.embedding(encoder_input)
        mask = self.embedding.compute_mask(encoder_input)

        # Process through encoder layers
        # First LSTM layer
        encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_1(
            encoder_output, mask=mask, training=training
        )
        # Concatenate forward and backward states
        state_h_1: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
        state_c_1: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

        # Apply dropout
        encoder_output: Optional[tf.Tensor] = self.dropout_1(encoder_output, training=training)

        # Second LSTM layer
        encoder_output, forward_h, forward_c, backward_h, backward_c = self.bidirectional_lstm_2(
            encoder_output, mask=mask, training=training
        )

        # Concatenate forward and backward states
        state_h_2: tf.Tensor = tf.concat([forward_h, backward_h], axis=-1)
        state_c_2: tf.Tensor = tf.concat([forward_c, backward_c], axis=-1)

        # Apply dropout
        encoder_output: tf.Tensor = self.dropout_2(encoder_output, training=training)

        # Final states
        final_state_h: tf.Tensor = state_h_2
        final_state_c: tf.Tensor = state_c_2

        return encoder_output, final_state_h, final_state_c

    def compute_mask(self, inputs: tf.Tensor, mask: Optional[tf.Tensor] = None) -> Optional[tf.Tensor]:
        return self.embedding.compute_mask(inputs, mask)

    def get_config(self) -> dict:
        config = super(StackedBidirectionalLSTMEncoder, self).get_config()
        config.update({
            'vocab_size': self.embedding.input_dim,
            'encoder_embedding_dim': self.embedding.output_dim,
            'units': self.units,
            'dropout_rate': self.dropout_rate,
            'embedding': tf.keras.layers.serialize(self.embedding),
            'bidirectional_lstm_1': tf.keras.layers.serialize(self.bidirectional_lstm_1),
            'dropout_1': tf.keras.layers.serialize(self.dropout_1),
            'bidirectional_lstm_2': tf.keras.layers.serialize(self.bidirectional_lstm_2),
            'dropout_2': tf.keras.layers.serialize(self.dropout_2),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'StackedBidirectionalLSTMEncoder':
        # Deserialize layers
        config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
        config['bidirectional_lstm_1'] = tf.keras.layers.deserialize(config['bidirectional_lstm_1'])
        config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
        config['bidirectional_lstm_2'] = tf.keras.layers.deserialize(config['bidirectional_lstm_2'])
        config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
        return cls(**config)

解码器层

import tensorflow as tf
from tensorflow.keras.layers import Layer, Embedding, LSTM, Dropout, Dense
from typing import List, Optional, Tuple, Union, Any


class StackedLSTMDecoder(Layer):
    def __init__(self, vocab_size: int, decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2,
                 **kwargs) -> None:
        super(StackedLSTMDecoder, self).__init__(**kwargs)
        self.units: int = units
        self.embedding: Embedding = Embedding(vocab_size, decoder_embedding_dim, mask_zero=True)
        self.vocab_size: int = vocab_size
        self.dropout_rate: float = dropout_rate

        # Decoder: 4-layer LSTM without internal Dropout
        # Define LSTM and Dropout layers individually
        self.lstm_decoder_1: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_1'
        )
        self.dropout_1: Dropout = Dropout(dropout_rate, name='decoder_dropout_1')

        self.lstm_decoder_2: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_2'
        )
        self.dropout_2: Dropout = Dropout(dropout_rate, name='decoder_dropout_2')

        self.lstm_decoder_3: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_3'
        )
        self.dropout_3: Dropout = Dropout(dropout_rate, name='decoder_dropout_3')

        self.lstm_decoder_4: LSTM = LSTM(
            units,
            return_sequences=True,
            return_state=True,
            name='lstm_decoder_4'
        )
        self.dropout_4: Dropout = Dropout(dropout_rate, name='decoder_dropout_4')

        # Attention Mechanism
        self.attention: BahdanauAttention = BahdanauAttention(units=units)

        # Output layer
        self.dense: Dense = Dense(vocab_size, activation='softmax')

    def call(self, inputs: Tuple[tf.Tensor, List[tf.Tensor], tf.Tensor], training: Optional[bool] = None,
             mask: Optional[tf.Tensor] = None) -> tf.Tensor:
        # Extract initial state and encoder output from inputs
        decoder_input, initial_state, encoder_output = inputs

        if decoder_input is None or initial_state is None or encoder_output is None:
            raise ValueError('decoder_input, initial_state and encoder_output must be provided to the Decoder.')

        # Embed the input and extract decoder mask
        decoder_output: tf.Tensor = self.embedding(decoder_input)
        decoder_mask: Optional[tf.Tensor] = self.embedding.compute_mask(decoder_input)

        # Process through decoder layers
        # First LSTM layer with initial state
        decoder_output, _, _ = self.lstm_decoder_1(
            decoder_output,
            mask=decoder_mask,
            initial_state=initial_state,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_1(decoder_output, training=training)

        # Second LSTM layer
        decoder_output, _, _ = self.lstm_decoder_2(
            decoder_output,
            mask=decoder_mask,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_2(decoder_output, training=training)

        # Third LSTM layer
        decoder_output, _, _ = self.lstm_decoder_3(
            decoder_output,
            mask=decoder_mask,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_3(decoder_output, training=training)

        # Fourth LSTM layer
        decoder_output, final_state_h, final_state_c = self.lstm_decoder_4(
            decoder_output,
            mask=decoder_mask,
            training=training
        )
        decoder_output: tf.Tensor = self.dropout_4(decoder_output, training=training)

        # Extract only the encoder_mask from the mask list
        if mask is not None and isinstance(mask, (list, tuple)):
            encoder_mask = mask[1]
        else:
            encoder_mask = mask

        # Apply attention
        context_vector, attention_weights = self.attention(
            inputs=[encoder_output, decoder_output],
            mask=encoder_mask
        )

        # Concatenate decoder outputs and context vector
        concat_output: tf.Tensor = tf.concat([decoder_output, context_vector], axis=-1)  # (batch_size, seq_len_dec, units + units_enc)

        # Generate outputs
        decoder_output: tf.Tensor = self.dense(concat_output)  # (batch_size, seq_len_dec, vocab_size)

        return decoder_output

    @staticmethod
    def compute_mask(inputs: Any, mask: Optional[Any] = None) -> None:
        return None

    def get_config(self) -> dict:
        config = super(StackedLSTMDecoder, self).get_config()
        config.update({
            'vocab_size': self.vocab_size,
            'decoder_embedding_dim': self.embedding.output_dim,
            'units': self.units,
            'dropout_rate': self.dropout_rate,
            'embedding': tf.keras.layers.serialize(self.embedding),
            'lstm_decoder_1': tf.keras.layers.serialize(self.lstm_decoder_1),
            'dropout_1': tf.keras.layers.serialize(self.dropout_1),
            'lstm_decoder_2': tf.keras.layers.serialize(self.lstm_decoder_2),
            'dropout_2': tf.keras.layers.serialize(self.dropout_2),
            'lstm_decoder_3': tf.keras.layers.serialize(self.lstm_decoder_3),
            'dropout_3': tf.keras.layers.serialize(self.dropout_3),
            'lstm_decoder_4': tf.keras.layers.serialize(self.lstm_decoder_4),
            'dropout_4': tf.keras.layers.serialize(self.dropout_4),
            'attention': tf.keras.layers.serialize(self.attention),
            'dense': tf.keras.layers.serialize(self.dense),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'StackedLSTMDecoder':
        # Deserialize layers
        config['embedding'] = tf.keras.layers.deserialize(config['embedding'])
        config['lstm_decoder_1'] = tf.keras.layers.deserialize(config['lstm_decoder_1'])
        config['dropout_1'] = tf.keras.layers.deserialize(config['dropout_1'])
        config['lstm_decoder_2'] = tf.keras.layers.deserialize(config['lstm_decoder_2'])
        config['dropout_2'] = tf.keras.layers.deserialize(config['dropout_2'])
        config['lstm_decoder_3'] = tf.keras.layers.deserialize(config['lstm_decoder_3'])
        config['dropout_3'] = tf.keras.layers.deserialize(config['dropout_3'])
        config['lstm_decoder_4'] = tf.keras.layers.deserialize(config['lstm_decoder_4'])
        config['dropout_4'] = tf.keras.layers.deserialize(config['dropout_4'])
        config['attention'] = tf.keras.layers.deserialize(config['attention'])
        config['dense'] = tf.keras.layers.deserialize(config['dense'])
        return cls(**config)

注意力层

import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense
from attention.attention_interface import AttentionInterface
from typing import List, Optional, Tuple, Union


class BahdanauAttention(Layer):
    def __init__(self, units: int, **kwargs):
        super(BahdanauAttention, self).__init__(**kwargs)
        self.units: int = units
        self.attention_dense1: Dense = Dense(units, name='attention_dense1')
        self.attention_dense2: Dense = Dense(units, name='attention_dense2')
        self.attention_v: Dense = Dense(1, name='attention_v')
        self.supports_masking: bool = True

    def call(self, inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None,
             training: Union[None, bool] = None) -> Tuple[tf.Tensor, tf.Tensor]:
        # Unpack inputs
        encoder_output, decoder_output = inputs

        # Attention Mechanism
        # Calculate attention scores
        # Expand dimensions to match the shapes for broadcasting
        encoder_output_expanded: tf.Tensor = tf.expand_dims(encoder_output,
                                                 1)  # Shape: (batch_size, 1, seq_len_encoder, units*2)
        decoder_output_expanded: tf.Tensor = tf.expand_dims(decoder_output,
                                                 2)  # Shape: (batch_size, seq_len_decoder, 1, units)

        # Compute the attention scores
        score: tf.Tensor = tf.nn.tanh(
            self.attention_dense1(encoder_output_expanded) + self.attention_dense2(decoder_output_expanded)
        )  # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units)

        # Apply mask if available
        if mask is not None:
            # If mask is a list or tuple, both encoder and decoder mask have been passed.
            # Extract the encoder mask
            if isinstance(mask, (list, tuple)):
                encoder_mask: tf.Tensor = mask[0]
            else:
                encoder_mask = mask
            if encoder_mask is not None:
                # mask shape: (batch_size, seq_len_encoder)
                # Expand mask to match score dimensions
                encoder_mask = tf.cast(tf.expand_dims(encoder_mask, 1), dtype=score.dtype)  # (batch_size, 1, seq_len_encoder)
                encoder_mask = tf.expand_dims(encoder_mask, -1)  # (batch_size, 1, seq_len_encoder, 1)
                # Add a large negative value to masked positions to nullify their effect after softmax
                score += (1.0 - encoder_mask) * -1e9

        attention_weights: tf.Tensor = tf.nn.softmax(self.attention_v(score),
                                          axis=2)  # Shape: (batch_size, seq_len_decoder, seq_len_encoder, 1)

        # Compute the context vector
        context_vector: tf.Tensor = attention_weights * encoder_output_expanded  # Shape: (batch_size, seq_len_decoder, seq_len_encoder, units*2)
        context_vector: tf.Tensor = tf.reduce_sum(context_vector, axis=2)  # Shape: (batch_size, seq_len_decoder, units*2)

        return context_vector, attention_weights

    @staticmethod
    def compute_mask(inputs: List[tf.Tensor], mask: Optional[tf.Tensor] = None) -> None:
        # This layer does not propagate the mask further
        return None

    def get_config(self) -> dict:
        config = super(BahdanauAttention, self).get_config()
        config.update({
            'units': self.units,
            'attention_dense1': tf.keras.layers.serialize(self.attention_dense1),
            'attention_dense2': tf.keras.layers.serialize(self.attention_dense2),
            'attention_v': tf.keras.layers.serialize(self.attention_v),
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'BahdanauAttention':
        # Deserialize layers
        config['attention_dense1'] = tf.keras.layers.deserialize(config['attention_dense1'])
        config['attention_dense2'] = tf.keras.layers.deserialize(config['attention_dense2'])
        config['attention_v'] = tf.keras.layers.deserialize(config['attention_v'])
        return cls(**config)

Seq2Seq 模型

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from tensorflow.train import Checkpoint, CheckpointManager
from tensorflow.keras.callbacks import Callback
from typing import Optional, Any, Tuple

class RetrosynthesisSeq2SeqModel(Model):
    def __init__(self, input_vocab_size: int, output_vocab_size: int, encoder_embedding_dim: int,
                 decoder_embedding_dim: int, units: int, dropout_rate: float = 0.2, *args, **kwargs):
        super(RetrosynthesisSeq2SeqModel, self).__init__(*args, **kwargs)

        # Save the number of units (neurons)
        self.units: int = units

        # Encoder layer
        self.encoder: StackedBidirectionalLSTMEncoder = StackedBidirectionalLSTMEncoder(
            input_vocab_size, encoder_embedding_dim, units, dropout_rate
        )

        # Decoder layer
        self.decoder: StackedLSTMDecoder = StackedLSTMDecoder(
            output_vocab_size, decoder_embedding_dim, units, dropout_rate
        )

        # Save the vocabulary sizes
        self.input_vocab_size: int = input_vocab_size
        self.output_vocab_size: int = output_vocab_size

        # Mapping encoder final states to decoder initial states
        self.enc_state_h: Dense = Dense(units, name='enc_state_h')
        self.enc_state_c: Dense = Dense(units, name='enc_state_c')

        # Store the data processors (to be set externally)
        self.encoder_data_processor: Optional[Any] = None
        self.decoder_data_processor: Optional[Any] = None

        # Save the dropout rate
        self.dropout_rate: float = dropout_rate

    def build(self, input_shape):
        # Define the input shapes for encoder and decoder
        encoder_input_shape, decoder_input_shape = input_shape

        # Pass a dummy input through encoder and decoder to initialize weights
        encoder_dummy = tf.zeros(encoder_input_shape)
        decoder_dummy = tf.zeros(decoder_input_shape)

        # Forward pass to build the model
        self.call((encoder_dummy, decoder_dummy), training=False)

        # Mark the model as built
        super(RetrosynthesisSeq2SeqModel, self).build(input_shape)

    def call(self, inputs: Tuple[tf.Tensor, tf.Tensor], training: Optional[bool] = None) -> tf.Tensor:
        """
        Forward pass of the Seq2Seq model.

        Args:
            inputs (Tuple[tf.Tensor, tf.Tensor]): Tuple containing encoder and decoder inputs.
            training (Optional[bool], optional): Training flag. Defaults to None.

        Returns:
            tf.Tensor: The output predictions from the decoder.
        """
        # Extract encoder and decoder inputs
        encoder_input, decoder_input = inputs

        # Encoder
        encoder_output, state_h, state_c = self.encoder.call(encoder_input, training=training)

        # Map encoder final states to decoder initial states
        decoder_initial_state_h: tf.Tensor = self.enc_state_h(state_h)  # (batch_size, units)
        decoder_initial_state_c: tf.Tensor = self.enc_state_c(state_c)  # (batch_size, units)
        decoder_initial_state: Tuple[tf.Tensor, tf.Tensor] = (decoder_initial_state_h, decoder_initial_state_c)

        # Prepare decoder inputs as a tuple
        decoder_inputs: Tuple[tf.Tensor, Tuple[tf.Tensor, tf.Tensor], tf.Tensor] = (
            decoder_input,
            decoder_initial_state,
            encoder_output
        )

        # Extract encoder mask
        encoder_mask: Optional[tf.Tensor] = self.encoder.compute_mask(encoder_input)

        # Decoder
        output: tf.Tensor = self.decoder.call(
            decoder_inputs,
            training=training,
            mask=encoder_mask
        )

        return output

    def get_config(self) -> dict:
        config = super(RetrosynthesisSeq2SeqModel, self).get_config()
        config.update({
            'units': self.units,
            'input_vocab_size': self.input_vocab_size,
            'output_vocab_size': self.output_vocab_size,
            'encoder_embedding_dim': self.encoder.embedding.output_dim,
            'decoder_embedding_dim': self.decoder.embedding.output_dim,
            'dropout_rate': self.dropout_rate,
            'encoder': tf.keras.layers.serialize(self.encoder),
            'decoder': tf.keras.layers.serialize(self.decoder),
            'enc_state_h': tf.keras.layers.serialize(self.enc_state_h),
            'enc_state_c': tf.keras.layers.serialize(self.enc_state_c)
        })
        return config

    @classmethod
    def from_config(cls, config: dict) -> 'RetrosynthesisSeq2SeqModel':
        # Deserialize layers
        config['encoder'] = tf.keras.layers.deserialize(config['encoder'])
        config['decoder'] = tf.keras.layers.deserialize(config['decoder'])
        config['enc_state_h'] = tf.keras.layers.deserialize(config['enc_state_h'])
        config['enc_state_c'] = tf.keras.layers.deserialize(config['enc_state_c'])
        return cls(**config)

最小可重现示例脚本

#!/usr/bin/env python3

import numpy as np
from tensorflow.keras.optimizers import Adam

input_vocab_size = 1000
output_vocab_size = 1000
encoder_embedding_dim = 32
decoder_embedding_dim = 64
units = 128
dropout_rate = 0.2

model = RetrosynthesisSeq2SeqModel(
    input_vocab_size=input_vocab_size,
    output_vocab_size=output_vocab_size,
    encoder_embedding_dim=encoder_embedding_dim,
    decoder_embedding_dim=decoder_embedding_dim,
    units=units,
    dropout_rate=dropout_rate
)

encoder_input_shape = (1, 20)  # (batch_size, sequence_length)
decoder_input_shape = (1, 20)  # (batch_size, sequence_length)

model.build([encoder_input_shape, decoder_input_shape])

sample_encoder_input = np.random.randint(0, input_vocab_size, size=(1, 20))
sample_decoder_input = np.random.randint(0, output_vocab_size, size=(1, 20))

learning_rate: float = 0.0001
optimizer: Adam = Adam(learning_rate=learning_rate, clipnorm=5.0)

model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

output = model([sample_encoder_input, sample_decoder_input])
print("Model output shape:", output.shape)

model.save('minimal_seq2seq_model.keras')
print("Model saved successfully.")
python tensorflow machine-learning keras deep-learning
1个回答
0
投票

我也遇到了这个问题,但我没有使用自定义图层。对我来说解决这个问题的是创建一个新的 conda 环境并降级到 TensorFlow 2.12.0 和 h5py 3.6.0。

conda create -n tf_env python=3.10
conda activate tf_env
conda install h5py=3.6.0
conda install tensorflow[and-cuda]

你也可以指定TensorFlow版本,但我的默认下载2.12.0。

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.