我有一个
Thread
的子类,我在我的项目中使用它。在这个类中,我手动传入 ContextVar。但是,有时(一天一次或两次),我注意到子线程中的 ContextVar 未设置(恢复为默认值)。
class MyThread(Thread):
def __init__(
self,
group: None = None,
target: Callable[..., Any] | None = None,
name: str | None = None,
args: tuple[Any, ...] = (),
kwargs: dict[str, Any] | None = None,
*,
daemon: bool | None = None,
):
super().__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)
self.my_resource = get_resource_info()
def run(self):
self._exception = None
try:
set_my_resource_info(self.my_resource.name, self.my_resource.kind)
self._return_value = super().run()
except BaseException as e:
self._exception = e
def join(self, timeout: float | None = None):
super().join(timeout)
if self._exception:
raise self._exception
return self._return_value
在另一个模块中我有:
@dataclass
class MyResourceInfo:
name: str
kind: str ="unknown"
resource_info: ContextVar[MyResourceInfo] = ContextVar(
'my_resource_info',
default=MyResourceInfo(name=get_default_resource_name()),
)
def set_resource_info(name: str, kind: str = 'unknown') -> Token[MyResourceInfo]:
return resource_info.set(MyResourceInfo(name=name, kind=kind))
为什么子线程中上下文变量会间歇性地恢复为默认值?
抱歉 - 我已经在评论中讨论了你可以做什么 - 但由于这可能会揭示 Python contextvar 实现中的严重错误,所以我真的尝试重现该问题。
我想出了以下脚本,并用不同的方式运行它 Fedora Linux 上的 Python 版本(3.10.7、3.13.0、3.13.0t), 还有 docker 中的 Python 3.10.15 (python:3.10-slim-bookworm 如上下文所示),尽可能快地从 1000 个线程旋转到 1_000_000 个线程,并且在几个不同的点和组合中添加延迟,而不是单个ContextVar 设置失败的时间。
(我也尝试过,如注释掉的行中所示,在 常规目标函数,使用默认的threading.Thread类)
contextkabum.py:
import threading
import contextvars
import time
import sys
var = contextvars.ContextVar("var", default=0)
errors = []
delay = sys.getswitchinterval() * 3
class T(threading.Thread):
def run(self, *args):
#time.sleep(delay)
var.set(42)
#time.sleep(delay)
#time.sleep(0.001)
return super().run(*args)
def target():
#var.set(42)
#time.sleep(0.001)
if (x:=var.get()) != 42:
errors.append(y:=(threading.current_thread(), time.time(), x))
print(y)
time.sleep(0.001)
def doit(n=1_000_000):
threads = []
for i in range(n):
threads.append(T(target=target))
for i, t in enumerate(threads):
t.start()
if not i % 30:
pass
#time.sleep(.01)
if not i % 100:
print(i)
for t in threads:
t.join()
print (errors)
doit()
Dockerfile:
from python:3.10-slim-bookworm
copy contextkabum.py /root
cmd python /root/contextkabum.py
正如工作环境影响的评论中所述,只需添加显式检查即可,这在总体上是多余的 不存在问题,并重新设置有问题的 ContextVar:
def run(self):
self._exception = None
try:
set_my_resource_info(self.my_resource.name, self.my_resource.kind)
if check_resource_is_default():
time.sleep(0.005)
set_my_resource_info(self.my_resource.name, self.my_resource.kind)
self._return_value = super().run()
except BaseException as e:
self._exception = e
def check_resource_is_default():
...