我正在尝试使用 transitions 设置异步状态机,可以使用 aiomqtt 通过 MQTT 进行控制。我设法获得了一个运行的最小工作示例,如果没有重复操作,该示例就可以运行:
状态机脚本:
import asyncio
import aiomqtt
from transitions.extensions import AsyncMachine
import sys
import os
import logging
logging.basicConfig(level=logging.DEBUG)
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
set_event_loop_policy(WindowsSelectorEventLoopPolicy())
class MQTTStateMachine:
states = ['init','A','B',{'name': 'stopped', 'final':True}]
def __init__(self,client):
self.client = client
self.machine = AsyncMachine(model=self, states=MQTTStateMachine.states, initial='init')
self.machine.add_transition(trigger='init', source='init', dest='A')
self.machine.add_transition(trigger='stop', source=['A','B'], dest='stopped')
async def update_state(self):
await self.client.publish("MQTTstatemachine/machine/state", str(self.state))
async def receiveMQTT(self):
await self.client.subscribe("MQTTstatemachine/controller/transition")
async for message in self.client.messages:
if message.topic.matches("MQTTstatemachine/controller/transition"):
await self.trigger(message.payload.decode())
async def on_enter_A(self):
await self.update_state()
print("I'm now in state A.")
async def on_enter_B(self):
await self.update_state()
print("I'm now in state B.")
async def on_enter_stopped(self):
await self.update_state()
print("I'm now in state stopped.")
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
MQTTmachine = MQTTStateMachine(client)
await MQTTmachine.init()
await asyncio.create_task(MQTTmachine.receiveMQTT())
if __name__ == "__main__":
asyncio.run(main())
控制器脚本:
import asyncio
import aiomqtt
import sys
import os
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
set_event_loop_policy(WindowsSelectorEventLoopPolicy())
async def publishTransitions(client):
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","to_B")
print("Transition: to_B")
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","to_A")
print("Transition: to_A")
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","to_B")
print("Transition: to_B")
await asyncio.sleep(5)
await client.publish("MQTTstatemachine/controller/transition","stop")
print("Transition: stop")
await asyncio.sleep(5)
async def receiveStates(client):
await client.subscribe("MQTTstatemachine/machine/state")
async for message in client.messages:
if message.topic.matches("MQTTstatemachine/machine/state"):
print(f"Statemachine now in state {message.payload.decode()}")
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
tasks = [publishTransitions(client),receiveStates(client)]
pending = [asyncio.create_task(t) for t in tasks]
done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)
pendingTask = pending.pop()
pendingTask.cancel()
try:
await pendingTask
except asyncio.CancelledError:
print(f"Finsihed.")
if __name__ == "__main__":
asyncio.run(main())
我尝试通过替换 on_enter_B 来执行一些重复操作:
async def on_enter_B(self):
while self.is_B():
await self.update_state()
print("I'm now in state B.")
await asyncio.sleep(1)
但随后它陷入状态 B,并且不再通过 MQTT 响应状态更改。
我尝试通过反射性过渡来实现重复任务,但这也不起作用:
async def on_enter_B(self):
await self.update_state()
print("I'm now in state B.")
await asyncio.sleep(1)
await self.to_B()
据我所知,问题就在这里:
async def receiveMQTT(self):
await self.client.subscribe("MQTTstatemachine/controller/transition")
async for message in self.client.messages:
if message.topic.matches("MQTTstatemachine/controller/transition"):
await self.trigger(message.payload.decode()) # [1]
当任何回调阻塞时,self.trigger
中的[1]
将不会返回,因此for循环永远不会到达下一个元素和/或receiveMQTT
本身会阻塞。
解决方案:不要等待
trigger
,而是将其作为一项任务 [2]。跟踪正在运行的任务 [3],以防止任务被垃圾收集器停止(有关详细信息,请参阅 python 文档)。当新的触发器到达时 AsyncMachine
应取消正在运行的触发器。这将调用完成回调,您可以使用它从任务列表中删除引用[4]
。在我的示例中 self.task
是一个集合(参见 [5])。
def __init__(self, client):
self.client = client
self.tasks = set() # [5]
# ...
async def receiveMQTT(self):
await self.client.subscribe("MQTTstatemachine/controller/transition")
async for message in self.client.messages:
if message.topic.matches("MQTTstatemachine/controller/transition"):
task = asyncio.create_task(self.trigger(message.payload.decode())) # [2]
self.tasks.add(task) # [3]
task.add_done_callback(self.tasks.discard) # [4]