我正在尝试同步多个线程。我期望使用 threading.Condition 和 threading.Barrier 时的脚本输出大致相同,但事实并非如此。请解释为什么会发生这种情况。
一般来说,我需要线程在无限循环中执行工作(一些IO操作),但是每个周期都是以主线程的权限开始的,并且只有在所有线程完成前一个周期后才给予权限。
脚本1
from threading import Barrier, Thread
from time import sleep, time
br = Barrier(3)
store = []
def f1():
while True:
br.wait()
sleep(1)
print("Calc part1")
def f2():
while True:
br.wait()
sleep(1)
print("Calc part2")
Thread(target=f1).start()
Thread(target=f2).start()
for i in range(10):
br.wait()
print(f'end iter {i}')
print(f'-------------')
预期行为
ent iter 0
-------------
Calc part1
Calc part2
ent iter 1
-------------
Calc part2
Calc part1
ent iter 2
-------------
Calc part1
...
脚本2
from threading import Condition, Thread
from time import sleep
condition = False
cv = Condition()
def predicate():
return condition
def f1():
for i in range(3):
with cv:
cv.wait_for(predicate)
sleep(1)
print("Calc part1")
def f2():
for i in range(3):
with cv:
cv.wait_for(predicate)
sleep(1)
print("Calc part2")
Thread(target=f1).start()
Thread(target=f2).start()
with cv:
condition = True
cv.notify_all()
意外行为
Calc part1
Calc part1
Calc part1
Calc part2
Calc part2
Calc part2
为什么线程的标准输出没有混合在第二个脚本的结果中?
您期望 threading 模块的原语是公平的,即等待时间较长的线程将首先获取底层锁。但不幸的是,他们不提供这样的保证,
threading.Condition
的不公平是一个已知问题。
在您的示例中,第一个线程退出
with cv
,但由于循环而再次进入,并且不会阻塞。原因有两点:
release()
-acquire()
这样的序列在这些调用之间切换的机会非常低,几乎不可能 - 第一个线程始终能够重新获取锁,从而剥夺了CPU 时间的第二个线程。cv.wait_for()
已经是 predicate()
,
True
永远不会释放锁定。如果你还想要一个公平的条件,你可以尝试
aiologic.Condition
(我是aiologic的创造者)。只需将 from threading import Condition
替换为 from aiologic import Condition
,您就会得到预期的输出:
Calc part1
Calc part2
Calc part1
Calc part2
Calc part1
Calc part2
但事情是这样的:到目前为止,
cv.wait_for()
是不公平的。如果您有一个谓词 True
经常出现,如您的示例所示,那么这不会成为问题,但在 CPython 问题的示例 中,资源匮乏仍然会发生。在 aiologic
的未来版本中它将变得公平。