以下代码启动 5 个多处理任务。这些任务打印一条消息,然后随机休眠一段时间,然后再次执行此操作。重复 10 次,因此整组任务最多应在 50 秒左右完成。
首次创建池时,会打印 5 个池对象的地址。
这段代码工作正常,但在完成所有工作之前就被终止了。再次运行时,您可以看到为 5 个池对象分配了新地址。
在完成之前停止操作的过程(键盘中断)几次之后,代码停止工作。任务永远不会被创建,代码会一直持续到最后。池对象的分配表明它们一遍又一遍地分配相同的对象,而在分配新对象地址之前。
因此看来操作系统资源正在被分配并且从未被释放。当这些资源全部用完时,代码就会停止工作。但即使它运行的命令提示符被终止(其窗口关闭),这些资源也不会被返回。当打开新的命令提示符并再次运行应用程序时,您可以看到分配了相同的对象 ID,任务从未启动,并且代码仍然失败。通常,代码会在 pool.join() 处停止,等待工作人员(任务)完成。
似乎没有办法让操作系统释放这些池资源。唯一的方法是重新启动操作系统。
在代码调试期间,不太可能让它运行完成并执行 pool.close() 和 poll.join() 。因此,在执行几次正在测试的代码后,不再分配任何任务。
因此问题是 - 我们如何确保生成的任务被终止并将其资源返回到池中?
import multiprocessing
import time
import random
# Simultaneous tasks running
def task(id):
for i in range(10):
stime = random.randint(1,5)
print(f"Task {id}: Woke up. Now sleep {stime}")
time.sleep(stime)
exit(0)
if __name__ == "__main__":
print ('create pool of tasks')
pool = multiprocessing.Pool(processes=5)
print (pool)
# Start 5 asynchronous tasks
for i in range(1,6):
result = pool.apply_async(task, i)
print (result)
pool.close()
# Wait for all tasks to complete
pool.join()
print ('end program')
问题在于
apply_async
采用可迭代的参数,而不是整数。通常使用错误参数得到的异常被封装在 AsyncResult
对象中。与“调用后不管”不同,通常会处理结果,以防在调用中或工作线程本身中引发异常。
在示例中,我通过了
[i]
并处理了结果。
import multiprocessing
import time
import random
# Simultaneous tasks running
def task(id):
for i in range(10):
stime = random.randint(1,5)
print(f"Task {id}: Woke up. Now sleep {stime}")
time.sleep(stime)
exit(0)
if __name__ == "__main__":
print ('create pool of tasks')
pool = multiprocessing.Pool(processes=5)
print (pool)
# Start 5 asynchronous tasks
results = []
for i in range(1,6):
result = pool.apply_async(task, [i])
print (result)
for result in results:
print(result.get())
pool.close()
# Wait for all tasks to complete
pool.join()