如何在 AsyncMachine 中执行连续/重复的任务而不导致死锁?

问题描述 投票:0回答:1

我正在尝试使用 transitions 设置异步状态机,可以使用 aiomqtt 通过 MQTT 进行控制。我设法获得了一个运行的最小工作示例,如果没有重复操作,该示例就可以运行:

状态机脚本:

import asyncio
import aiomqtt
from transitions.extensions import AsyncMachine
import sys
import os
import logging
logging.basicConfig(level=logging.DEBUG)

if sys.platform.lower() == "win32" or os.name.lower() == "nt":
    from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
    set_event_loop_policy(WindowsSelectorEventLoopPolicy())

class MQTTStateMachine:
    states = ['init','A','B',{'name': 'stopped', 'final':True}]

    def __init__(self,client):
        self.client = client

        self.machine = AsyncMachine(model=self, states=MQTTStateMachine.states, initial='init')
        self.machine.add_transition(trigger='init', source='init', dest='A')
        self.machine.add_transition(trigger='stop', source=['A','B'], dest='stopped')

    async def update_state(self):
        await self.client.publish("MQTTstatemachine/machine/state", str(self.state))
    
    async def receiveMQTT(self):
        await self.client.subscribe("MQTTstatemachine/controller/transition")
        async for message in self.client.messages:
            if message.topic.matches("MQTTstatemachine/controller/transition"):
                await self.trigger(message.payload.decode())

    async def on_enter_A(self):
        await self.update_state()
        print("I'm now in state A.")
    
    async def on_enter_B(self):
        await self.update_state()
        print("I'm now in state B.")
    
    async def on_enter_stopped(self):
        await self.update_state()
        print("I'm now in state stopped.")

async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        MQTTmachine = MQTTStateMachine(client)
        await MQTTmachine.init()
        await asyncio.create_task(MQTTmachine.receiveMQTT())

if __name__ == "__main__":
    asyncio.run(main())

控制器脚本:

import asyncio
import aiomqtt
import sys
import os
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
    from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
    set_event_loop_policy(WindowsSelectorEventLoopPolicy())

async def publishTransitions(client):
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","to_B")
        print("Transition: to_B")
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","to_A")
        print("Transition: to_A")
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","to_B")
        print("Transition: to_B")
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","stop")
        print("Transition: stop")
        await asyncio.sleep(5)

async def receiveStates(client):
        await client.subscribe("MQTTstatemachine/machine/state")
        async for message in client.messages:
                if message.topic.matches("MQTTstatemachine/machine/state"):
                        print(f"Statemachine now in state {message.payload.decode()}")

async def main():
        async with aiomqtt.Client("test.mosquitto.org") as client:
                tasks = [publishTransitions(client),receiveStates(client)]
                pending = [asyncio.create_task(t) for t in tasks]
                done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)
              
                pendingTask = pending.pop()
                pendingTask.cancel()
                try:
                        await pendingTask
                except asyncio.CancelledError:
                        print(f"Finsihed.")

if __name__ == "__main__":
    asyncio.run(main())

我尝试通过替换 on_enter_B 来执行一些重复操作:

    async def on_enter_B(self):
        while self.is_B():
            await self.update_state()
            print("I'm now in state B.")
            await asyncio.sleep(1)

但随后它陷入状态 B,并且不再通过 MQTT 响应状态更改。

我尝试通过反射性过渡来实现重复任务,但这也不起作用:

    async def on_enter_B(self):
        await self.update_state()
        print("I'm now in state B.")
        await asyncio.sleep(1)
        await self.to_B()
mqtt state-machine pytransitions
1个回答
0
投票

据我所知,问题就在这里:

    async def receiveMQTT(self):
        await self.client.subscribe("MQTTstatemachine/controller/transition")
        async for message in self.client.messages:
            if message.topic.matches("MQTTstatemachine/controller/transition"):
                await self.trigger(message.payload.decode())  # [1]
当任何回调阻塞时,

self.trigger
中的
[1]
将不会返回,因此for循环永远不会到达下一个元素和/或
receiveMQTT
本身会阻塞。

解决方案:不要等待

trigger
,而是将其作为一项任务 [2]。跟踪正在运行的任务 [3],以防止任务被垃圾收集器停止(有关详细信息,请参阅 python 文档)。当新的触发器到达时
AsyncMachine
应取消正在运行的触发器。这将调用完成回调,您可以使用它从任务列表中删除引用
[4]
。在我的示例中
self.task
是一个集合(参见 [5])。

    def __init__(self, client):
        self.client = client
        self.tasks = set()  # [5]

# ...

    async def receiveMQTT(self):
        await self.client.subscribe("MQTTstatemachine/controller/transition")
        async for message in self.client.messages:
            if message.topic.matches("MQTTstatemachine/controller/transition"):
                task = asyncio.create_task(self.trigger(message.payload.decode()))  # [2]
                self.tasks.add(task)  # [3]
                task.add_done_callback(self.tasks.discard)  # [4]
© www.soinside.com 2019 - 2024. All rights reserved.