我有一个有趣的问题。
我正在使用tf.Estimator
对大型数据集(15M行,16列)进行回归,并使用常用方法将数据加载到tf.Dataset
:
def input_fn_train(features, labels, batch_size, repeat_count):
dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
return dataset
features
和labels
是pandas DataFrames。 input_fn
适用于较小的数据(最多可达数百万行),但在包含整个数据集时,它会引发:
[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.
导致此错误的原因是,在显式数据(而不是占位符)上调用.from_tensor_slices()
时,TensorFlow会为每个数据点创建tf.constant()
。 TensorFlow中图形的大小存在固有的局限性,而且我的数据太大了。
在tensorflow文档中,他们提到了这一点,并提到了解决这个问题的方法:
“作为替代方案,您可以根据tf.placeholder()
张量定义数据集,并在数据集上初始化迭代器时提供NumPy数组。”
这个方法可以解决我的问题,但问题在于初始化,事实上,我无法访问它。在运行数据集迭代器的初始化操作时,无法将实际值提供给占位符。
使用以下钩子在tf.Estimator
中初始化数据集:
class _DatasetInitializerHook(training.SessionRunHook):
def __init__(self, iterator):
self._iterator = iterator
def begin(self):
self._initializer = self._iterator.initializer
def after_create_session(self, session, coord):
del coord
session.run(self._initializer)
如您所见,它在创建会话后立即被调用。问题是初始化程序会话运行独立于所有挂钩,因此在初始化会话运行时不会调用挂钩,因此,没有办法传递feed_dict
来填充占位符。
我无法自己初始化迭代器,因为没有办法将迭代器传递给Estimator
。迭代器在之后被初始化
解决这个问题的方法是明确地将我的数据分成TFRecord
文件并使用TensorFlow函数直接加载它们,但是,这是一个非常不受欢迎的解决方案。在我公司的代码库中,我们拥有自己的优化二进制数据格式,使用其他文件会占用大量空间和IO事务时间,这一点至关重要。
我认为我的问题有多种解决方案,但是,我仍然没有提出任何解决方案。如果您有任何想法或建议,如何做到这一点,请分享,谢谢!
好的,我找到了解决问题的方法。它可以使用Dataset.from_generator()
函数完成。我的解决方案使用一个生成器生成DataFrames,第二个生成行,同时迭代这些DataFrame。
a = arange(20).reshape(10,2)
df = DataFrame(a, columns=['x1','y1'])
def gen_partition():
for i in range(2):
df_partition = df.iloc[i * 5 : (i + 1) * 5]
yield df_partition
def gen_fields():
for partition in gen_partition(): # type: DataFrame
for row in partition.itertuples():
yield {'x1': row[1]}, row[2]
def input_fn_gen():
dataset = Dataset.from_generator(
gen_fields,
({'x1': tf.float32}, tf.float32),
({'x1': tf.TensorShape([])}, tf.TensorShape([])))
dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
return dataset
feature_columns = [tf.feature_column.numeric_column('x1')]
dir = get_model_dir('linreg_test')
tf.logging.set_verbosity('INFO')
estimator = tf.estimator.LinearRegressor(
feature_columns=feature_columns,
model_dir=dir,
label_dimension=1
)
estimator.train(input_fn=lambda: input_fn_gen())