Asyncio测试使用loop.call_soon_threadsafe的函数

问题描述 投票:0回答:1

我具有以下功能:

def send_command(self, cmd):
    self.loop.call_soon_threadsafe(
        functools.partial(
            self._transport.write, str(cmd).encode() + b"\n"
        )
    )

被测系统(sut)是从asyncio.Protocol继承的类,该类将一些命令发送到套接字上的硬件。我必须使用线程,因为这是wxPython下GUI的一部分。最后,如果我调用self._transport.write,则代码在Linux上可以正常运行,但在Windows™上会崩溃。

运行测试时:

@pytest.mark.asyncio
async def test_send_command(self):
    self.sut._transport = Mock()
    self.sut.send_command("ook eek")
    assert self.sut._transport.write.called is True

我收到断言错误。永远不会调用self.sut._transport.write。如果我直接在函数中调用self._transport.write,则代码会在Windows™上崩溃,但测试可以顺利通过。

我在这里想念什么?

有人吗?…

当然,这不是一个极端的情况……

python-3.x linux windows python-asyncio python-multithreading
1个回答
0
投票

解决...

[C0之后,使用此:

reading and experimenting with even loops

绕过问题。当然,这意味着在Windows上使用次效率循环。 ☹PHA!

欢迎任何有更好解决方案的人使用虚假的互联网积分。

© www.soinside.com 2019 - 2024. All rights reserved.