我并行运行一个函数,每个工作线程都可以访问一个 ID 为 0 或 1 的 GPU。
def f(GPU_id, arg):
# Select GPU to use.
os.environ["CUDA_VISIBLE_DEVICES"]=str(GPU_id)
# Do something with arg.
假设我想评估
arg=[1, 2, 3, 4]
。
from multiprocessing import Pool
p = Pool(2)
for arg in [[1, 2], [3, 4]]:
# Call the function in parallel on GPU 0 and 1
p.starmap(f, zip([0, 1], arg))
但是现在,我想异步运行它(不确定这是否是正确的术语),这意味着工作人员不会等到其他人完成任务才开始下一个任务。因此,worker 需要保留 GPU_id 并从列表中获取下一个参数。
有什么想法可以做到这一点吗?
你面临的问题是每个进程必须存储它应该使用哪个GPU。目前,通过压缩,您可以手动选择在哪个 GPU 上处理哪个参数。这反过来会导致某些进程等待其他进程,因为它们被分配给同一个 GPU。
解决这个问题的方法是在使用map函数之前为每个CPU分配一个GPU ID。在这种情况下,您无需手动压缩 GPU ID 并让多处理模块处理调度。
为进程分配GPU ID可以通过查找当前CPU ID,然后使用映射获取相应的GPU ID来完成。为此,我们将使用 multiprocessing.current_process 函数来获取当前进程。获取进程后,我们需要获取一个唯一标识符,用于将 CPU ID 映射到 GPU ID。这可以使用 Process.name 属性来完成。如果你阅读文档,据说如果没有手动设置名称,所有进程都将被分配一个
Process-<ID>
形式的名称。因此,我们可以从进程名称中提取ID,并用它来获取唯一的GPU标识符。
import multiprocessing
# can also be a dictionary
gpu_id_list = [3, 5, 7, 10]
def function(x):
cpu_name = multiprocessing.current_process().name
cpu_id = int(cpu_name[cpu_name.find('-') + 1:]) - 1
gpu_id = gpu_id_list[cpu_id]
return x * gpu_id
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
input_list = [1, 1, 1, 1]
print(pool.map(function, input_list))
打印
[3, 5, 7, 10]
意味着每个进程都被分配给自己的 GPU 标识符。
@eozd 的答案是正确的,但可以改进。
这将在给定数量的 GPU 之间均匀分配多个工作进程。多个工作人员可以共享一个 GPU(如果它们适合 GPU 的内存)。
import logging
import multiprocessing
import os
# logging just to get not mangled outputs
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
def get_process_index(process) -> int:
proc_name = multiprocessing.current_process().name
# eg. "ForkPoolWorker-10", can be numbered not from zero upon multiple Pool invocations,
# be the numbering should be contiguous
return int(proc_name.split("-")[1])
def initialize(gpus: list[str]):
if gpus:
proc_index = get_process_index(multiprocessing.current_process())
selected_gpu = gpus[proc_index % len(gpus)]
os.environ["CUDA_VISIBLE_DEVICES"] = str(selected_gpu)
logger.info(f"process id: {proc_index} -> GPU id: {selected_gpu}")
def work(i):
time.sleep(0.1)
logger.info(f"work item {i} on GPU {os.environ['CUDA_VISIBLE_DEVICES']}")
available_gpu_ids = [3, 5, 7]
with multiprocessing.Pool(processes=4, initializer=initialize, initargs=(available_gpu_ids,)) as pool:
pool.map(work, range(12))
输出示例:
INFO:__main__:process id: 17 -> GPU id: 7
INFO:__main__:process id: 18 -> GPU id: 3
INFO:__main__:process id: 19 -> GPU id: 5
INFO:__main__:process id: 20 -> GPU id: 7
INFO:__main__:work item 2 on GPU 5
INFO:__main__:work item 0 on GPU 7
INFO:__main__:work item 1 on GPU 3
INFO:__main__:work item 3 on GPU 7
INFO:__main__:work item 5 on GPU 7
INFO:__main__:work item 4 on GPU 5
INFO:__main__:work item 6 on GPU 3
INFO:__main__:work item 7 on GPU 7
INFO:__main__:work item 9 on GPU 7
INFO:__main__:work item 8 on GPU 5
INFO:__main__:work item 10 on GPU 3
INFO:__main__:work item 11 on GPU 7
注意:ML 模型(例如 ONNX 会话)可以在初始化函数中加载并存储在某些全局模块属性中,然后从工作函数中调用。