如何在Estimator训练期间动态加载数据集的新部分?

问题描述 投票:3回答:1

我有一个有趣的问题。

我正在使用tf.Estimator对大型数据集(15M行,16列)进行回归,并使用常用方法将数据加载到tf.Dataset

def input_fn_train(features, labels, batch_size, repeat_count):

    dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
    dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
    return dataset

featureslabels是pandas DataFrames。 input_fn适用于较小的数据(最多可达数百万行),但在包含整个数据集时,它会引发:

[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.

导致此错误的原因是,在显式数据(而不是占位符)上调用.from_tensor_slices()时,TensorFlow会为每个数据点创建tf.constant()。 TensorFlow中图形的大小存在固有的局限性,而且我的数据太大了。

在tensorflow文档中,他们提到了这一点,并提到了解决这个问题的方法:

“作为替代方案,您可以根据tf.placeholder()张量定义数据集,并在数据集上初始化迭代器时提供NumPy数组。”

这个方法可以解决我的问题,但问题在于初始化,事实上,我无法访问它。在运行数据集迭代器的初始化操作时,无法将实际值提供给占位符。

使用以下钩子在tf.Estimator中初始化数据集:

class _DatasetInitializerHook(training.SessionRunHook):

    def __init__(self, iterator):
        self._iterator = iterator

    def begin(self):
        self._initializer = self._iterator.initializer

    def after_create_session(self, session, coord):
        del coord
        session.run(self._initializer)

如您所见,它在创建会话后立即被调用。问题是初始化程序会话运行独立于所有挂钩,因此在初始化会话运行时不会调用挂钩,因此,没有办法传递feed_dict来填充占位符。

我无法自己初始化迭代器,因为没有办法将迭代器传递给Estimator。迭代器在之后被初始化

解决这个问题的方法是明确地将我的数据分成TFRecord文件并使用TensorFlow函数直接加载它们,但是,这是一个非常不受欢迎的解决方案。在我公司的代码库中,我们拥有自己的优化二进制数据格式,使用其他文件会占用大量空间和IO事务时间,这一点至关重要。

我认为我的问题有多种解决方案,但是,我仍然没有提出任何解决方案。如果您有任何想法或建议,如何做到这一点,请分享,谢谢!

python tensorflow tensorflow-datasets tensorflow-estimator
1个回答
2
投票

好的,我找到了解决问题的方法。它可以使用Dataset.from_generator()函数完成。我的解决方案使用一个生成器生成DataFrames,第二个生成行,同时迭代这些DataFrame。

a = arange(20).reshape(10,2)
df = DataFrame(a, columns=['x1','y1'])


def gen_partition():
    for i in range(2):
        df_partition = df.iloc[i * 5 : (i + 1) * 5]
        yield df_partition


def gen_fields():
    for partition in gen_partition(): # type: DataFrame
        for row in partition.itertuples():
            yield {'x1': row[1]}, row[2]


def input_fn_gen():
    dataset = Dataset.from_generator(
        gen_fields,
        ({'x1': tf.float32}, tf.float32),
        ({'x1': tf.TensorShape([])}, tf.TensorShape([])))

    dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
    return dataset


feature_columns = [tf.feature_column.numeric_column('x1')]

dir = get_model_dir('linreg_test')

tf.logging.set_verbosity('INFO')

estimator = tf.estimator.LinearRegressor(
    feature_columns=feature_columns,
    model_dir=dir,
    label_dimension=1
)

estimator.train(input_fn=lambda: input_fn_gen())
© www.soinside.com 2019 - 2024. All rights reserved.