我正在尝试在单台机器的两个 GPU 上部署深度学习模型。我正在使用 TensorFlow 镜像策略。我收到以下错误:
回溯(最近一次调用最后一次):
代码
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
import os
import json
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
tf.logging.set_verbosity(tf.logging.INFO)
from tensorflow.keras.datasets import mnist
def cnn_model_fn(features, labels, mode):
input_layer = tf.reshape(features["x"], [-1, 28, 28, 1])
input_layer = tf.cast(input_layer, tf.float32)
labels = tf.cast(labels, tf.int32)
conv1 = tf.layers.conv2d(
inputs=input_layer,
filters=32,
kernel_size=[5, 5],
padding="same",
activation=tf.nn.relu)
pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
conv2 = tf.layers.conv2d(
inputs=pool1,
filters=64,
kernel_size=[5, 5],
padding="same",
activation=tf.nn.relu)
pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])
dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu)
dropout = tf.layers.dropout(
inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN)
logits = tf.layers.dense(inputs=dropout, units=10)
predictions = {
# Generate predictions (for PREDICT and EVAL mode)
"classes": tf.argmax(input=logits, axis=1),
# Add `softmax_tensor` to the graph. It is used for PREDICT and by the
# `logging_hook`.
"probabilities": tf.nn.softmax(logits, name="softmax_tensor")
}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
# Calculate Loss (for both TRAIN and EVAL modes)
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001)
train_op = optimizer.minimize(
loss=loss,
global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
# Add evaluation metrics (for EVAL mode)
eval_metric_ops = {
"accuracy": tf.metrics.accuracy(
labels=labels, predictions=predictions["classes"])}
return tf.estimator.EstimatorSpec(
mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)
def per_device_batch_size(batch_size, num_gpus):
if num_gpus <= 1:
return batch_size
remainder = batch_size % num_gpus
if remainder:
err = ('When running with multiple GPUs, batch size '
'must be a multiple of the number of available GPUs. Found {} '
'GPUs with a batch size of {}; try --batch_size={} instead.'
).format(num_gpus, batch_size, batch_size - remainder)
raise ValueError(err)
return int(batch_size / num_gpus)
class InputFnProvider:
def __init__(self, train_batch_size):
self.train_batch_size = train_batch_size
self.__load_data()
def __load_data(self):
# Load training and eval data
(X_train, Y_train), (X_test, Y_test) = mnist.load_data()
self.train_data = X_train # Returns np.array
self.train_labels = Y_train
self.eval_data = X_test # Returns np.array
self.eval_labels = Y_test
def train_input_fn(self):
dataset = tf.data.Dataset.from_tensor_slices(({"x": self.train_data}, self.train_labels))
dataset = dataset.shuffle(1000).repeat().batch(self.train_batch_size)
return dataset
def eval_input_fn(self):
"""An input function for evaluation or prediction"""
dataset = tf.data.Dataset.from_tensor_slices(({"x": self.eval_data}, self.eval_labels))
dataset = dataset.batch(1)
return dataset
def main(unused_argv):
batch_size = 100
num_gpus = 2
input_fn_provider = InputFnProvider(per_device_batch_size(batch_size, num_gpus))
if num_gpus > 1:
distribution = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"],
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
else:
distribution = None
# Pass to RunConfig
config = tf.estimator.RunConfig(
train_distribute=distribution,
model_dir="/tmp/mnist_convnet_model")
mnist_classifier = tf.estimator.Estimator(
model_fn=cnn_model_fn,
config=config)
# Train the model
mnist_classifier.train(
input_fn=input_fn_provider.train_input_fn,
steps=1000)
eval_results = mnist_classifier.evaluate(input_fn=input_fn_provider.eval_input_fn)
print(eval_results)
if __name__ == "__main__":
tf.app.run()
令人惊讶的是,当我使用相同的代码在单个 GPU 上部署模型时,它可以工作;然而,当我尝试通过更改代码在两个 GPU 上部署模型时,我遇到了上述错误。我不知道这个错误。有人可以帮忙吗?
参考此链接查看如何在 python 中创建多处理线程池: https://docs.python.org/zh-tw/3/library/concurrent.futures.html
在
concurrent.futures.threadpoolexecutor
中,当代码块退出时,上下文管理器关闭线程池。
回到我们的问题,mirroredStrategy 还创建了一个多处理线程池。 但是,它不会在程序结束之前自动关闭线程池。因此,我们必须在退出时显式关闭池,使用:
import atexit
strategy = tf.distribute.MirroredStrategy()
atexit.register(strategy._extended._collective_ops._pool.close)
请参阅此处得票最多的答案: https://github.com/tensorflow/tensorflow/issues/50487