在实际集群中使用 MultiWorkerMirroredStrategy 时,Tensorflow 分布式学习不起作用

问题描述 投票:0回答:2

我只是想按照 tensorflow 文档中的 MultiWorkerMirroredStrategy 的例子。 我在具有单个节点的 localhost 中成功训练。 但是,我在有两个节点的集群中训练失败。 我试过禁用防火墙,但没有解决问题。

这是main.py。 (我在节点 1 和节点 2 中运行相同的代码,除了 tf_config 变量。我将节点 1 的 tf_config 设置为

tf_config['task']['index']=0
,将节点 2 的 tf_config 设置为
tf_config['task']['index']=1

main.py

任何帮助表示赞赏。谢谢。

python tensorflow deep-learning distributed
2个回答
0
投票

我看到你没有错误代码,但我想我可以推断出问题可能出现在哪里,因为你的代码应该可以工作。一旦有机会,我将在我的 kubernetes 上进行测试(我有一个节点关闭 atm)。

  • 最有可能的问题。您正在使用 json.dumps() 来设置环境变量。在许多设置中你应该使用:

    tf_config=json.loads(os.environ.get(TF_CONFIG) 或 '{}'), TASK_INDEX=tf_config['任务']['索引']

这应该可以解决公开端口和 ip 配置的任何问题。

-听起来你用的方法在笔记本上?因为您没有为 main.py 运行相同的代码。就像在一个 main.py 中一样,您设置 1 而另一个设置 0。无论哪种方式,这都不是您在这里所做的。您将索引设置为 1 和 0,但您不仅返回索引,还返回完整的集群规范以及您设置的索引。如果您的集群未设置环境变量,您将需要取回已设置的 TF_CONFIG,然后使用负载将其设置为您的 tf_config,现在您将仅获得该节点的副本索引。

  • 如果你使用的是笔记本,它需要连接到集群环境,否则你就是在为你的机器设置一个本地环境变量,而不是集群上的容器。考虑使用 Kubeflow 来管理它。

  • 您可以在设置集群后从笔记本启动 配置操作,或将 TF_job 规范构建为定义节点规范的 YAML,然后使用该规范启动 pod。

  • 无论哪种方式,集群都需要实际具有该配置,您应该能够在集群中加载环境,以便为每个节点分配一个索引,并且您正在从启动时设置的节点副本 ID 获取该索引节点并使用 YAML 或 json 字典指定。如果 kubernetes 上的 replica-index:{num} 与容器上的环境变量不匹配,则在本地容器中运行的本地设置环境对实际集群没有任何意义——这是在 pod 启动时分配的。

-尝试制作一个函数,该函数将返回每个工作人员的索引,以测试它是否在您的 kubernetes 仪表板上或从 kubectl 设置为相同的副本索引。确保将函数打印出来,以便您可以在 pod 日志中看到它。这将有助于调试。

-查看 pod 日志并查看 pod 是否连接到服务器,以及是否使用与您的集群兼容的任何通信规范:grcp/etc。您没有设置通信策略,但在大多数情况下它应该能够自动为您找到它(以防万一)。

-如果您能够启动 pod,请确保在再次尝试之前终止它们。一旦你掌握了他们的 python 管道 skd,kubeflow 将再次让你的事情变得更加容易。您可以将函数作为容器启动。您可以通过终止旧 pod 来构建一个清理操作。

-您应该考虑将 main.py 和任何其他支持模块加载到存储库(例如 dockerhub)中的图像上,以便容器可以加载图像。使用 Multiworker Strategy,每台机器都需要具有相同的数据才能正确分片。再次检查您的 pod 日志,看看它是否无法分片数据。

-您是否在具有不同 GPU 的本地机器上运行?如果是这样,您应该使用 Mirrored Strategy NOT multiworker。


0
投票

所以有很多可能出错的地方与你的配置有关,你在运行什么?您是在使用 minikube,还是在拆分 CPU 内核?我可以提出一般改进建议,以帮助您的代码处理更多通信选项。此外,您似乎没有数据分发选项。您放置在策略上下文管理器范围内的唯一东西是模型定义和编译。看起来这两个节点也共享同一个端口。最好将 TF_config 设置为传入的环境变量

'tf_config = json.loads(os.environ.get('TF_CONFIG') 或 '{}')'

这是我过去使用的设置示例,它既可以通过 mini-kube 在多个 CPU 线程上运行,也可以在 Kubernetes 框架上运行。

模型替换为简单的 mnist 模型。

from __future__ import absolute_import, division, print_function

import argparse
import json
import os

import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.keras import layers, models


def make_datasets_unbatched():
  BUFFER_SIZE = 10000

  # Scaling MNIST data from (0, 255] to (0., 1.]
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True)

  return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)


def build_and_compile_cnn_model():
  model = models.Sequential()
  model.add(
      layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D((2, 2)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.Flatten())
  model.add(layers.Dense(64, activation='relu'))
  model.add(layers.Dense(10, activation='softmax'))

  model.summary()

  model.compile(optimizer='adam',
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

  return model


def decay(epoch):
  if epoch < 3: #pylint: disable=no-else-return
    return 1e-3
  if 3 <= epoch < 7:
    return 1e-4
  return 1e-5


def main(args):

  # MultiWorkerMirroredStrategy creates copies of all variables in the model's
  # layers on each device across all workers
  # if your GPUs don't support NCCL, replace "communication" with another
  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
      communication=tf.distribute.experimental.CollectiveCommunication.NCCL)

  BATCH_SIZE_PER_REPLICA = 64
  BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

  with strategy.scope():
    ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = \
        tf.data.experimental.AutoShardPolicy.DATA
    ds_train = ds_train.with_options(options)
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = build_and_compile_cnn_model()

  # Define the checkpoint directory to store the checkpoints
  checkpoint_dir = args.checkpoint_dir

  # Name of the checkpoint files
  checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

  # Function for decaying the learning rate.
  # You can define any decay function you need.
  # Callback for printing the LR at the end of each epoch.
  class PrintLR(tf.keras.callbacks.Callback):

    def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use
      print('\nLearning rate for epoch {} is {}'.format(
        epoch + 1, multi_worker_model.optimizer.lr.numpy()))

  callbacks = [
      tf.keras.callbacks.TensorBoard(log_dir='./logs'),
      tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                         save_weights_only=True),
      tf.keras.callbacks.LearningRateScheduler(decay),
      PrintLR()
  ]

  # Keras' `model.fit()` trains the model with specified number of epochs and
  # number of steps per epoch. Note that the numbers here are for demonstration
  # purposes only and may not sufficiently produce a model with good quality.
  multi_worker_model.fit(ds_train,
                         epochs=10,
                         steps_per_epoch=70,
                         callbacks=callbacks)

  # Saving a model
  # Let `is_chief` be a utility function that inspects the cluster spec and
  # current task type and returns True if the worker is the chief and False
  # otherwise.
  def is_chief():
    return TASK_INDEX == 0

  if is_chief():
    model_path = args.saved_model_dir

  else:
    # Save to a path that is unique across workers.
    model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX)

  multi_worker_model.save(model_path)


if __name__ == '__main__':
  os.environ['NCCL_DEBUG'] = 'INFO'

  tfds.disable_progress_bar()

  # to decide if a worker is chief, get TASK_INDEX in Cluster info
  tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
  TASK_INDEX = tf_config['task']['index']

  parser = argparse.ArgumentParser()
  parser.add_argument('--saved_model_dir',
                      type=str,
                      required=True,
                      help='Tensorflow export directory.')

  parser.add_argument('--checkpoint_dir',
                      type=str,
                      required=True,
                      help='Tensorflow checkpoint directory.')

  parsed_args = parser.parse_args()
  main(parsed_args)
© www.soinside.com 2019 - 2024. All rights reserved.