当我使用
sagemaker.tensorflow.TensorFlow.fit()
而不是 Pipe
作为 File
时,对 input_mode
的调用无限期挂起,并且没有错误消息。我相应地将 TensorFlow Dataset
替换为 Pipemodedataset
。 File
模式下的训练成功完成。
我的数据由两个 s3 存储桶组成,每个存储桶中有多个 tfrecord 文件。尽管广泛查看了文档,但我对在这种情况下如何使用
Pipemodedataset
没有信心 - 具体来说,如何设置 channel
。
这是我的 Sagemaker 笔记本设置:
hyperparameters = {
"batch-size": 1,
"pipe_mode": 1,
}
estimator_config = {
"entry_point": "tensorflow_train.py",
"source_dir": "source",
"framework_version": "2.3",
"py_version": "py37",
"instance_type": "ml.p3.2xlarge",
"instance_count": 1,
"role": sagemaker.get_execution_role(),
"hyperparameters": hyperparameters,
"output_path": f"s3://{bucket_name}",
"input_mode": "Pipe",
}
tf_estimator = TensorFlow(**estimator_config)
s3_data_channels = {
"training": f"s3://{bucket_name}/data/training",
"validation": f"s3://{bucket_name}/data/validation",
}
tf_estimator.fit(s3_data_channels)
如果我在
aws s3 ls
上运行 s3_data_channels
,我会得到 tfrecord 文件列表。
这是我设置数据集的方式(根据是否选择
pipe_mode
参见 if / else 语句:
import tensorflow as tf
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
...
arg_parser.add_argument("--pipe_mode", type=int, default=0)
arg_parser.add_argument("--train_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAINING"))
arg_parser.add_argument(
"--validation_dir", type=str, default=os.environ.get("SM_CHANNEL_VALIDATION")
)
arg_parser.add_argument("--model_dir", type=str)
args, _ = arg_parser.parse_known_args()
AUTOTUNE = tf.data.experimental.AUTOTUNE
if args.pipe_mode == 1:
from sagemaker_tensorflow import PipeModeDataset
train_ds = PipeModeDataset(channel="training", record_format='TFRecord')
val_ds = PipeModeDataset(channel="validation", record_format='TFRecord')
else:
train_files = tf.data.Dataset.list_files(args.train_dir + '/*tfrecord')
val_files = tf.data.Dataset.list_files(args.validation_dir + '/*tfrecord')
train_ds = tf.data.TFRecordDataset(filenames=train_files, num_parallel_reads=AUTOTUNE)
val_ds = tf.data.TFRecordDataset(filenames=val_files, num_parallel_reads=AUTOTUNE)
train_ds = (
train_ds.map(tfrecord_parser, num_parallel_calls=AUTOTUNE)
.batch(args.batch_size)
.prefetch(AUTOTUNE)
)
val_ds = (
val_ds.map(tfrecord_parser, num_parallel_calls=AUTOTUNE)
.batch(args.batch_size)
.prefetch(AUTOTUNE)
)
...
我遇到了同样的问题,使用管道模式时 model.fit() 无限期地卡住了。经过一些研究并尝试了许多更改后,通过在拟合模型时定义steps_per_epoch解决了这个问题。
我想当使用文件模式时它已经知道每个时期会有多少步,但是使用管道模式你必须手动指定它
我知道已经两年了。但这个问题在 Sagemaker 或 Keras 中从未得到解决。 其根本原因是 keras.engine.data_adapter.DataHandler.enumerate_epochs 以及 PipeModeDataset 的工作方式。
当 should_recreate_iterator 为 true 时,它将在 enumerate_epochs 中调用 iter(dataset) 两次,第一个 iterator 立即被丢弃。 并且由于 PipeModeDataset 的实现,每次创建迭代器时,它都会将索引增加到下一个 PIPE,在这种情况下,它将在第一个 epoch 中使用 PIPE training_1。但由于 PIPE training_0 未被消耗,后台获取进程不会写入 training_1。所以我们会找到训练挂起的地方。
用户也可以使用PipeModeDataset轻松生成它。只需致电
from sagemaker_tensorflow import PipeModeDataset
train_ds = PipeModeDataset(channel="training", record_format='TFRecord')
iter(ds)
next(iter(ds)) # it will wait forever
这是猴子补丁:
from tensorflow.python.distribute.input_lib import (
DistributedDataset,
)
def enumerate_epochs(self):
"""Yields `(epoch, tf.data.Iterator)`."""
with self._truncate_execution_to_epoch():
if not self._adapter.should_recreate_iterator():
data_iterator = iter(self._dataset)
for epoch in range(self._initial_epoch, self._epochs):
if self._insufficient_data: # Set by `catch_stop_iteration`.
break
if self._adapter.should_recreate_iterator():
data_iterator = iter(self._dataset)
if not isinstance(self._dataset, DistributedDataset):
steps = self._infer_steps(
self._steps_per_epoch, self._dataset
)
if steps is not None:
self._inferred_steps = steps
yield epoch, data_iterator
self._adapter.on_epoch_end()
import keras
keras.engine.data_adapter.DataHandler.enumerate_epochs = enumerate_epochs