所以我刚开始使用Tensorflow,我正在努力正确理解输入管道。
我正在研究的问题是序列分类。我正在尝试读取形状为(100000,4)的CSV文件。前3列是功能,第4列是标签。但是 - 数据表示长度为10的序列,即行1-10是序列1,行11-20是序列2等。这也意味着每个标记重复10次。
所以在这个输入管道的某个点上,我需要重塑我的特征张量,如tf.reshape(features,[batch_size_,rows_per_ob,input_dim])。并且只取标签[:: rows_per_ob]的标签张量的每第10行
我应该指出的另一件事是我的实际数据集在数十亿行中,所以我必须考虑性能。
我在这里汇总了文档和其他帖子的下面代码,但我不认为我完全理解这一点,因为我看到以下错误:
INFO:tensorflow:向协调员报告错误:,尝试使用未初始化的值input_producer_2 / limit_epochs / epochs
似乎有一个超出范围的错误。
一旦我让它们工作,我也无法弄清楚如何处理这些批次。最初,我认为我会重塑它们然后只是将它们输入“feed_dict”,但后来我读到这非常糟糕,我应该使用tf.data.Dataset对象。但我不确定如何将这些批次提供给数据集。我还不完全确定在这个过程中重塑数据的最佳时间是什么时候?
最后一点是混淆 - 当你使用带有数据集对象的Iterator时,我看到我们使用了get_next()方法。这是否意味着数据集中的每个元素代表一整批数据?这是否意味着如果我们想要更改批量大小,我们需要重建整个数据集对象?
我真的很想把所有的东西都装在一起。如果有人对我有任何指示,我将非常感谢!谢谢!
# import
import tensorflow as tf
# constants
filename = "tensorflow_test_data.csv"
num_rows = 100000
rows_per_ob = 10
batch_size_ = 5
num_epochs_ = 2
num_batches = int(num_rows * num_epochs_ / batch_size_ / rows_per_ob)
# read csv line
def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines=1)
_, value = reader.read(filename_queue)
record_defaults = [[0.0], [0.0], [0.0], [0.0]]
a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([a, b, c])
return features, d
def input_pipeline(filename=filename, batch_size=batch_size_, num_epochs=num_epochs_):
filename_queue = tf.train.string_input_producer([filename],
num_epochs=num_epochs,
shuffle=False)
x, y = read_from_csv(filename_queue)
x_batch, y_batch = tf.train.batch([x, y],
batch_size = batch_size * rows_per_ob,
num_threads=1,
capacity=10000)
return x_batch, y_batch
###
x, y = input_pipeline(filename, batch_size=batch_size_,
num_epochs = num_epochs_)
# I imagine using lists is wrong here - this was more just for me to
# see the output
x_list = []
y_list = []
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for _ in range(num_batches):
x_batch, y_batch = sess.run([x, y])
x_list.append(x_batch)
y_list.append(y_batch)
coord.request_stop()
coord.join(threads)
您可以使用tf.data.Dataset
对象表达整个管道,这可能会使事情变得更容易:
dataset = tf.data.TextLineDataset(filename)
# Skip the header line.
dataset = dataset.skip(1)
# Combine 10 lines into a single observation.
dataset = dataset.batch(rows_per_ob)
def parse_observation(line_batch):
record_defaults = [[0.0], [0.0], [0.0], [0.0]]
a, b, c, d = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([a, b, c])
label = d[-1] # Take the label from the last row.
return features, label
# Parse each observation into a `row_per_ob X 2` matrix of features and a
# scalar label.
dataset = dataset.map(parse_observation)
# Batch multiple observations.
dataset = dataset.batch(batch_size)
# Optionally add a prefetch for performance.
dataset = dataset.prefetch(1)
要使用数据集中的值,您可以创建一个tf.data.Iterator
以将下一个元素作为一对tf.Tensor
对象,然后将它们用作模型的输入。
iterator = dataset.make_one_shot_iterator()
features_batch, label_batch = iterator.get_next()
# Use the `features_batch` and `label_batch` tensors as the inputs to
# the model, rather than fetching them and feeding them via the `Session`
# interface.
train_op = build_model(features_batch, label_batch)