从父线程传递到子线程时间歇性丢失ContextVar

问题描述 投票:0回答:1

我有一个

Thread
的子类,我在我的项目中使用它。在这个类中,我手动传入 ContextVar。但是,有时(一天一次或两次),我注意到子线程中的 ContextVar 未设置(恢复为默认值)。

class MyThread(Thread):
    def __init__(
        self,
        group: None = None,
        target: Callable[..., Any] | None = None,
        name: str | None = None,
        args: tuple[Any, ...] = (),
        kwargs: dict[str, Any] | None = None,
        *,
        daemon: bool | None = None,
    ):
        super().__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)
        self.my_resource = get_resource_info()

    def run(self):
        self._exception = None
        try:
            set_my_resource_info(self.my_resource.name, self.my_resource.kind)
            self._return_value = super().run()
        except BaseException as e:
            self._exception = e

    def join(self, timeout: float | None = None):
        super().join(timeout)
        if self._exception:
            raise self._exception
        return self._return_value

在另一个模块中我有:

@dataclass
class MyResourceInfo:
    name: str
    kind: str ="unknown"


resource_info: ContextVar[MyResourceInfo] = ContextVar(
    'my_resource_info',
    default=MyResourceInfo(name=get_default_resource_name()),
)

def set_resource_info(name: str, kind: str = 'unknown') -> Token[MyResourceInfo]:
    return resource_info.set(MyResourceInfo(name=name, kind=kind))

为什么子线程中上下文变量会间歇性地恢复为默认值?

python python-3.x multithreading python-multithreading python-contextvars
1个回答
0
投票

我无法重现该问题。

抱歉 - 我已经在评论中讨论了你可以做什么 - 但由于这可能会揭示 Python contextvar 实现中的严重错误,所以我真的尝试重现该问题。

我想出了以下脚本,并用不同的方式运行它 Fedora Linux 上的 Python 版本(3.10.7、3.13.0、3.13.0t), 还有 docker 中的 Python 3.10.15 (python:3.10-slim-bookworm 如上下文所示),尽可能快地从 1000 个线程旋转到 1_000_000 个线程,并且在几个不同的点和组合中添加延迟,而不是单个ContextVar 设置失败的时间。

(我也尝试过,如注释掉的行中所示,在 常规目标函数,使用默认的threading.Thread类)

contextkabum.py:

import threading
import contextvars
import time
import sys

var = contextvars.ContextVar("var", default=0)
errors = []
delay = sys.getswitchinterval() * 3


class T(threading.Thread):
    def run(self, *args):
        #time.sleep(delay)
        var.set(42)
        #time.sleep(delay)
        #time.sleep(0.001)
        return super().run(*args)


def target():
    #var.set(42)
    #time.sleep(0.001)
    if (x:=var.get()) != 42:
        errors.append(y:=(threading.current_thread(), time.time(), x))
        print(y)
    time.sleep(0.001)

def doit(n=1_000_000):
    threads = []
    for i in range(n):
        threads.append(T(target=target))
    for i, t in enumerate(threads):
        t.start()
        if not i % 30:
            pass
            #time.sleep(.01)
        if not i % 100:
            print(i)
    for t in threads:
        t.join()
    print (errors)


doit()

Dockerfile:

from python:3.10-slim-bookworm

copy contextkabum.py /root 

cmd python /root/contextkabum.py

解决方法:

正如工作环境影响的评论中所述,只需添加显式检查即可,这在总体上是多余的 不存在问题,并重新设置有问题的 ContextVar:


    def run(self):
        self._exception = None
        try:
            set_my_resource_info(self.my_resource.name, self.my_resource.kind)
            if check_resource_is_default():
                  time.sleep(0.005)
                              set_my_resource_info(self.my_resource.name, self.my_resource.kind)

            self._return_value = super().run()
        except BaseException as e:
            self._exception = e

def check_resource_is_default():
    ...
© www.soinside.com 2019 - 2024. All rights reserved.