我只是想按照 tensorflow 文档中的 MultiWorkerMirroredStrategy 的例子。 我在具有单个节点的 localhost 中成功训练。 但是,我在有两个节点的集群中训练失败。 我试过禁用防火墙,但没有解决问题。
这是main.py。 (我在节点 1 和节点 2 中运行相同的代码,除了 tf_config 变量。我将节点 1 的 tf_config 设置为
tf_config['task']['index']=0
,将节点 2 的 tf_config 设置为tf_config['task']['index']=1
)
任何帮助表示赞赏。谢谢。
我看到你没有错误代码,但我想我可以推断出问题可能出现在哪里,因为你的代码应该可以工作。一旦有机会,我将在我的 kubernetes 上进行测试(我有一个节点关闭 atm)。
最有可能的问题。您正在使用 json.dumps() 来设置环境变量。在许多设置中你应该使用:
tf_config=json.loads(os.environ.get(TF_CONFIG) 或 '{}'), TASK_INDEX=tf_config['任务']['索引']
这应该可以解决公开端口和 ip 配置的任何问题。
-听起来你用的方法在笔记本上?因为您没有为 main.py 运行相同的代码。就像在一个 main.py 中一样,您设置 1 而另一个设置 0。无论哪种方式,这都不是您在这里所做的。您将索引设置为 1 和 0,但您不仅返回索引,还返回完整的集群规范以及您设置的索引。如果您的集群未设置环境变量,您将需要取回已设置的 TF_CONFIG,然后使用负载将其设置为您的 tf_config,现在您将仅获得该节点的副本索引。
如果你使用的是笔记本,它需要连接到集群环境,否则你就是在为你的机器设置一个本地环境变量,而不是集群上的容器。考虑使用 Kubeflow 来管理它。
您可以在设置集群后从笔记本启动 配置操作,或将 TF_job 规范构建为定义节点规范的 YAML,然后使用该规范启动 pod。
无论哪种方式,集群都需要实际具有该配置,您应该能够在集群中加载环境,以便为每个节点分配一个索引,并且您正在从启动时设置的节点副本 ID 获取该索引节点并使用 YAML 或 json 字典指定。如果 kubernetes 上的 replica-index:{num} 与容器上的环境变量不匹配,则在本地容器中运行的本地设置环境对实际集群没有任何意义——这是在 pod 启动时分配的。
-尝试制作一个函数,该函数将返回每个工作人员的索引,以测试它是否在您的 kubernetes 仪表板上或从 kubectl 设置为相同的副本索引。确保将函数打印出来,以便您可以在 pod 日志中看到它。这将有助于调试。
-查看 pod 日志并查看 pod 是否连接到服务器,以及是否使用与您的集群兼容的任何通信规范:grcp/etc。您没有设置通信策略,但在大多数情况下它应该能够自动为您找到它(以防万一)。
-如果您能够启动 pod,请确保在再次尝试之前终止它们。一旦你掌握了他们的 python 管道 skd,kubeflow 将再次让你的事情变得更加容易。您可以将函数作为容器启动。您可以通过终止旧 pod 来构建一个清理操作。
-您应该考虑将 main.py 和任何其他支持模块加载到存储库(例如 dockerhub)中的图像上,以便容器可以加载图像。使用 Multiworker Strategy,每台机器都需要具有相同的数据才能正确分片。再次检查您的 pod 日志,看看它是否无法分片数据。
-您是否在具有不同 GPU 的本地机器上运行?如果是这样,您应该使用 Mirrored Strategy NOT multiworker。
所以有很多可能出错的地方与你的配置有关,你在运行什么?您是在使用 minikube,还是在拆分 CPU 内核?我可以提出一般改进建议,以帮助您的代码处理更多通信选项。此外,您似乎没有数据分发选项。您放置在策略上下文管理器范围内的唯一东西是模型定义和编译。看起来这两个节点也共享同一个端口。最好将 TF_config 设置为传入的环境变量
'tf_config = json.loads(os.environ.get('TF_CONFIG') 或 '{}')'
这是我过去使用的设置示例,它既可以通过 mini-kube 在多个 CPU 线程上运行,也可以在 Kubernetes 框架上运行。
模型替换为简单的 mnist 模型。
from __future__ import absolute_import, division, print_function
import argparse
import json
import os
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.keras import layers, models
def make_datasets_unbatched():
BUFFER_SIZE = 10000
# Scaling MNIST data from (0, 255] to (0., 1.]
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True)
return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
def build_and_compile_cnn_model():
model = models.Sequential()
model.add(
layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
model.summary()
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
def decay(epoch):
if epoch < 3: #pylint: disable=no-else-return
return 1e-3
if 3 <= epoch < 7:
return 1e-4
return 1e-5
def main(args):
# MultiWorkerMirroredStrategy creates copies of all variables in the model's
# layers on each device across all workers
# if your GPUs don't support NCCL, replace "communication" with another
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
communication=tf.distribute.experimental.CollectiveCommunication.NCCL)
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
with strategy.scope():
ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat()
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = \
tf.data.experimental.AutoShardPolicy.DATA
ds_train = ds_train.with_options(options)
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_cnn_model()
# Define the checkpoint directory to store the checkpoints
checkpoint_dir = args.checkpoint_dir
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
# Function for decaying the learning rate.
# You can define any decay function you need.
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use
print('\nLearning rate for epoch {} is {}'.format(
epoch + 1, multi_worker_model.optimizer.lr.numpy()))
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir='./logs'),
tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
save_weights_only=True),
tf.keras.callbacks.LearningRateScheduler(decay),
PrintLR()
]
# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(ds_train,
epochs=10,
steps_per_epoch=70,
callbacks=callbacks)
# Saving a model
# Let `is_chief` be a utility function that inspects the cluster spec and
# current task type and returns True if the worker is the chief and False
# otherwise.
def is_chief():
return TASK_INDEX == 0
if is_chief():
model_path = args.saved_model_dir
else:
# Save to a path that is unique across workers.
model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX)
multi_worker_model.save(model_path)
if __name__ == '__main__':
os.environ['NCCL_DEBUG'] = 'INFO'
tfds.disable_progress_bar()
# to decide if a worker is chief, get TASK_INDEX in Cluster info
tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
TASK_INDEX = tf_config['task']['index']
parser = argparse.ArgumentParser()
parser.add_argument('--saved_model_dir',
type=str,
required=True,
help='Tensorflow export directory.')
parser.add_argument('--checkpoint_dir',
type=str,
required=True,
help='Tensorflow checkpoint directory.')
parsed_args = parser.parse_args()
main(parsed_args)