我正在尝试设计我的自定义状态机基类,它在自己的线程循环中使用事件。我需要 Event、State、Actor 和 StateMachine 类。我想从 StateMachine 类继承我的 FSM 类并生成事件并更改其状态。事件将在 StateMachine 类的基类 Actor 类中使用。任何人都可以为此目的给我一个示例代码或任何链接。我正在提供代码以及我试图用状态图实现的内容。
import threading
import queue
import time
import abc
class Event:
def __init__(self, name, handler=None):
self.name = name if name else type(self).__name__
self.handler = handler
def handle(self):
if self.handler is not None:
self.handler()
pass
class Actor(threading.Thread):
count = 0
def __init__(self, name: str = None, queue_size: int = 1000):
actor_name = name if name else "Actor-{i}".format(i=Actor.count)
threading.Thread.__init__(self, name=actor_name)
Actor.count += 1
self._event_queue = queue.Queue(maxsize=queue_size)
self._stop_event = threading.Event()
self.gen_condition = threading.Condition()
self.setDaemon(True)
self.start()
def run(self):
while not self._stop_event.is_set():
if not self._event_queue.empty():
self.gen_condition.acquire()
event = self._event_queue.get()
self.gen_condition.notifyAll()
self.gen_condition.release()
try:
self.dispatch(event)
finally:
del event
time.sleep(0.005)
def dispatch(self, event):
raise NotImplementedError("Child class have to implement dispatch function!")
def gen(self, event: Event):
self.gen_condition.acquire()
self._event_queue.put(event)
self.gen_condition.notifyAll()
self.gen_condition.release()
def stop(self):
self._stop_event.set()
pass
class State:
def __init__(self, name: str, machine: 'StateMachine'):
self.name = name if name else type(self).__name__
self.machine = machine
self.transitions = {}
self.root = None
self.sub_states = {}
self.sub_state = None
self._sub_default = None
def add_sub_state(self, sub_state: 'State', default: bool = False):
sub_state.root = self
self.sub_states[sub_state.name] = sub_state
if default:
if self._sub_default is not None:
raise Exception("A default sub-state already exists for {name}!".format(name=self.name))
self._sub_default = sub_state
def add_transition(self, event: Event, next_state: 'State'):
self.transitions[event] = next_state
def on_entry(self):
self.machine.process_state_on_entry(self.name)
if self._sub_default is not None:
self.sub_state = self._sub_default
self.machine.change_state(self.sub_state)
def on_exit(self):
if self.sub_state is not None:
self.sub_state.on_exit()
self.sub_state = None
self.machine.process_state_on_exit(self.name)
def handle_event(self, event: Event):
if self.root is not None:
next_state = self.root.transitions.get(event, None)
if next_state is not None:
self.machine.change_state(next_state)
return
if self.sub_state is not None:
self.sub_state.handle_event(event)
return
next_state = self.transitions.get(event, None)
if next_state is not None:
self.machine.change_state(next_state)
pass
class StateMachine(Actor):
def __init__(self, name):
Actor.__init__(self, name=name)
self._current_state = None
def change_state(self, state: State):
if state.root:
state.root.sub_state = state
if (self._current_state is not None) and (self._current_state is not state.root):
self._current_state.on_exit()
self._current_state = state
self._current_state.on_entry()
def dispatch(self, event: Event):
print("Event:", event.name, "on State:", self._current_state.name)
self._current_state.handle_event(event)
@abc.abstractmethod
def process_state_on_entry(self, state_name: str):
raise NotImplementedError("State machine class have to implement process_on_entry function!")
@abc.abstractmethod
def process_state_on_exit(self, state_name: str):
raise NotImplementedError("State machine class have to implement process_on_exit function!")
pass
当我尝试我的代码时,它打印如下:
” 事件:evStart 状态:空闲
状态退出:空闲
状态进入:初始化
事件:evInitialized on State:Initialization
状态退出:初始化
状态进入:可操作
状态进入:准备
事件:evPrepared 状态:Preparation
状态出口:准备
状态入口:EntryScan
事件:状态上的 evInPosition:EntryScan
状态退出:EntryScan
状态进入:ExitScan
事件:状态上的 evError:ExitScan
状态退出:ExitScan
状态条目:错误
"
我不明白为什么当错误事件发生时它不执行操作状态的 on_exit 方法。任何人都可以给我一些建议吗?