python asyncio - 取消 `to_thread` 任务不会停止线程

问题描述 投票:0回答:1

通过以下代码片段,我不明白为什么

infiniteTask
没有被取消(它不断发送垃圾邮件“我仍然站着”)

在调试模式下,我可以看到存储在

Task
中的
unfinished
确实被标记为“已取消”,但显然该线程没有被取消/杀死。

为什么取消包装任务时线程没有被杀死? 我该怎么做才能停止线程?

import time
import asyncio

def quickTask():
    time.sleep(1)

def infiniteTask():
    while True:
        time.sleep(1)
        print("I'm still standing")

async def main():
    finished, unfinished = await asyncio.wait({
            asyncio.create_task(asyncio.to_thread(quickTask)),
            asyncio.create_task(asyncio.to_thread(infiniteTask))
        },
        return_when = "FIRST_COMPLETED"
    )

    for task in unfinished:
        task.cancel()
    await asyncio.wait(unfinished)

    print("  finished : " + str(len(finished))) # print '1'
    print("unfinished : " + str(len(unfinished))) # print '1' 

    
asyncio.run(main())
python python-asyncio
1个回答
9
投票

原因

如果我们检查

asyncio.to_thread()
的定义:

# python310/Lib/asyncio/threads.py
# ...

async def to_thread(func, /, *args, **kwargs):
    """Asynchronously run function *func* in a separate thread.

    Any *args and **kwargs supplied for this function are directly passed
    to *func*. Also, the current :class:`contextvars.Context` is propagated,
    allowing context variables from the main thread to be accessed in the
    separate thread.

    Return a coroutine that can be awaited to get the eventual result of *func*.
    """
    loop = events.get_running_loop()
    ctx = contextvars.copy_context()
    func_call = functools.partial(ctx.run, func, *args, **kwargs)
    return await loop.run_in_executor(None, func_call)

它实际上是

loop.run_in_executor
的包装。

如果我们接下来讨论 asyncio 的测试如何处理

run_in_executor

# python310/Lib/test/test_asyncio/threads.py
# ...

class EventLoopTestsMixin:
    # ...

    def test_run_in_executor_cancel(self):
        called = False

        def patched_call_soon(*args):
            nonlocal called
            called = True

        def run():
            time.sleep(0.05)

        f2 = self.loop.run_in_executor(None, run)
        f2.cancel()
        self.loop.run_until_complete(
                self.loop.shutdown_default_executor())
        self.loop.close()
        self.loop.call_soon = patched_call_soon
        self.loop.call_soon_threadsafe = patched_call_soon
        time.sleep(0.4)
        self.assertFalse(called)

你可以看到它会等待

self.loop.shutdown_default_executor()

现在让我们看看它是什么样子。

# event.pyi
# ...

class BaseEventLoop(events.AbstractEventLoop):
    # ...

    async def shutdown_default_executor(self):
        """Schedule the shutdown of the default executor."""
        self._executor_shutdown_called = True
        if self._default_executor is None:
            return
        future = self.create_future()
        thread = threading.Thread(target=self._do_shutdown, args=(future,))
        thread.start()
        try:
            await future
        finally:
            thread.join()

    def _do_shutdown(self, future):
        try:
            self._default_executor.shutdown(wait=True)
            self.call_soon_threadsafe(future.set_result, None)
        except Exception as ex:
            self.call_soon_threadsafe(future.set_exception, ex)

在这里,我们可以看到它创建了另一个线程来等待

_do_shutdown
,然后使用
self._default_executor.shutdown
参数运行
wait=True

那么关闭是在哪里实现的:

# Python310/Lib/concurrent/futures/thread.py
# ...

class ThreadPoolExecutor(_base.Executor):
    # ...

    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            self._shutdown = True
            if cancel_futures:
                # Drain all work items from the queue, and then cancel their
                # associated futures.
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except queue.Empty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # Send a wake-up to prevent threads calling
            # _work_queue.get(block=True) from permanently blocking.
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()

wait=True
时,它决定等待所有线程优雅地停止。

从所有这些我们看不到任何实际取消线程的努力。

引自Trio文档

取消在这里是一个棘手的问题,因为Python 和它运行的操作系统都没有提供任何通用机制来取消线程中运行的任意同步函数。此函数将始终在启动线程之前检查入口处的取消情况。但是一旦线程运行,它可以通过两种方式处理被取消的情况:

  • 如果cancellable=False,该函数会忽略取消并继续执行,就像我们同步调用sync_fn一样。这是默认行为。
  • 如果cancellable=True,则该函数立即引发Cancelled。在这种情况下,线程继续在后台运行 - 我们只是放弃它去做它要做的任何事情,并默默地丢弃它引发的任何返回值或错误。

所以,从这些我们可以了解到,线程中的无限循环运行是没有办法终止的。


解决方法

既然我们知道我们必须更加小心地设计要在线程中运行的内容,我们需要一种方法来向线程发出我们想要停止的信号。

对于这种情况我们可以使用

threading.Event

(最初我用

asyncio.Event
写了答案,但这不是 线程安全,因为我们将函数执行移动到另一个线程,最好不要使用它。)

import time
import asyncio
import threading


def blocking_func(event: threading.Event):
    while not event.is_set():
        time.sleep(1)
        print("I'm still standing")


async def main():
    event = threading.Event()
    asyncio.create_task(asyncio.to_thread(blocking_func, event))

    await asyncio.sleep(5)
    # now lets stop
    event.set()

asyncio.run(main())

通过检查每个循环上的事件,我们可以看到程序正常终止。

I'm still standing
I'm still standing
I'm still standing
I'm still standing
I'm still standing
I'm still standing

Process finished with exit code 0
© www.soinside.com 2019 - 2024. All rights reserved.