asyncio:等待来自其他线程的事件

问题描述 投票:0回答:4

我正在用Python设计一个应用程序,它应该访问机器来执行一些(冗长的)任务。对于与网络相关的所有内容来说,asyncio 模块似乎都是一个不错的选择,但现在我需要访问一个特定组件的串行端口。我已经为实际的串行端口内容实现了某种抽象层,但无法弄清楚如何将其与 asyncio 合理地集成。

以下设置:我有一个线程运行一个循环,它定期与机器对话并解码响应。使用方法

enqueue_query()
,我可以将查询字符串放入队列中,然后该队列将由另一个线程发送到机器并引起响应。通过传入
threading.Event
(或具有
set()
方法的任何内容),调用者可以执行阻塞等待响应。这看起来像这样:

f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())

我现在的目标是将这些行放入协程中,并让 asyncio 处理此等待,以便应用程序可以同时执行其他操作。我怎么能这样做呢?它可能会通过将

f.wait()
包装到执行器中来工作,但这似乎有点愚蠢,因为这会创建一个新线程,只是为了等待另一个线程执行某些操作。

python events python-multithreading python-asyncio
4个回答
21
投票

通过传入

threading.Event
(或具有
set()
方法的任何内容),调用者可以执行阻塞等待响应。

鉴于查询函数的上述行为,您所需要的只是

asyncio.Event
的线程安全版本。只需 3 行代码:

import asyncio
class Event_ts(asyncio.Event):
    #TODO: clear() method
    def set(self):
        #FIXME: The _loop attribute is not documented as public api!
        self._loop.call_soon_threadsafe(super().set)

功能测试:

def threaded(event):
    import time
    while True:
        event.set()
        time.sleep(1)

async def main():
    import threading
    e = Event_ts()
    threading.Thread(target=threaded, args=(e,)).start()
    while True:
        await e.wait()
        e.clear()
        print('whatever')

asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()

7
投票

最简单的方法是完全按照您的建议进行操作 - 将对

f.wait()
的调用包装在执行器中:

@asyncio.coroutine
def do_enqueue():
    f = threading.Event()
    ch.enqueue_query('2 getnlimit', f)
    yield from loop.run_in_executor(None, f.wait)
    print(ch.get_query_responses())

您确实会产生启动线程池的开销(至少对于第一次调用,该池将从该点开始保留在内存中),但是任何提供类似

threading.Event()
的实现并具有线程安全阻塞的解决方案而非阻塞 API,如果内部不依赖任何后台线程,将会需要更多的工作。


7
投票

Huazuo Gau 的答案中的类

Event_ts
在 Python 3.9 之前运行良好,但在 3.10 中不行。这是因为在Python 3.10中私有属性
_loop
最初是
None

以下代码适用于 Python 3.10 以及 3.9 及更低版本。 (我还添加了

clear()
方法。)

import asyncio
class Event_ts(asyncio.Event):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self._loop is None:
            self._loop = asyncio.get_event_loop()

    def set(self):
        self._loop.call_soon_threadsafe(super().set)

    def clear(self):
        self._loop.call_soon_threadsafe(super().clear)

0
投票

如果您需要有效的解决方案,您可以使用

aiologic.lowlevel.AsyncEvent
(我是aiologic的创建者):

event = aiologic.lowlevel.AsyncEvent()

Timer(1, event.set).start()

await event  # will take 1 second

它是作为

asyncio.Event
的轻量级版本在单一 future 上实现的,因此它具有速度优势。与此处显示的所有基于
call_soon_threadsafe()
的解决方案不同,它可以正确处理事件循环关闭时的情况:如果在
event.set()
完成后调用
asyncio.run()
Event_ts
将引发
RuntimeError

但是,

aiologic.lowlevel.AsyncEvent
仅在满足以下条件时才有效:

  1. 您在当前事件循环中创建一个实例。
  2. 只有一个任务正在等待该实例。

否则你可以使用

aiologic.Event
,由于额外的抽象层,它的速度不那么快,但即使在更复杂的情况下也能工作。如果您需要
cancel()
方法的支持,请查看
aiologic.REvent

© www.soinside.com 2019 - 2024. All rights reserved.