Python 中的自定义状态机

问题描述 投票:0回答:0

我正在尝试设计我的自定义状态机基类,它在自己的线程循环中使用事件。我需要 Event、State、Actor 和 StateMachine 类。我想从 StateMachine 类继承我的 FSM 类并生成事件并更改其状态。事件将在 StateMachine 类的基类 Actor 类中使用。任何人都可以为此目的给我一个示例代码或任何链接。我正在提供代码以及我试图用状态图实现的内容。

import threading
import queue
import time
import abc


class Event:
    def __init__(self, name, handler=None):
        self.name = name if name else type(self).__name__
        self.handler = handler

    def handle(self):
        if self.handler is not None:
            self.handler()

    pass


class Actor(threading.Thread):
    count = 0

    def __init__(self, name: str = None, queue_size: int = 1000):
        actor_name = name if name else "Actor-{i}".format(i=Actor.count)
        threading.Thread.__init__(self, name=actor_name)
        Actor.count += 1
        self._event_queue = queue.Queue(maxsize=queue_size)
        self._stop_event = threading.Event()
        self.gen_condition = threading.Condition()
        self.setDaemon(True)
        self.start()

    def run(self):
        while not self._stop_event.is_set():
            if not self._event_queue.empty():
                self.gen_condition.acquire()
                event = self._event_queue.get()
                self.gen_condition.notifyAll()
                self.gen_condition.release()
                try:
                    self.dispatch(event)
                finally:
                    del event
            time.sleep(0.005)

    def dispatch(self, event):
        raise NotImplementedError("Child class have to implement dispatch function!")

    def gen(self, event: Event):
        self.gen_condition.acquire()
        self._event_queue.put(event)
        self.gen_condition.notifyAll()
        self.gen_condition.release()

    def stop(self):
        self._stop_event.set()

    pass


class State:
    def __init__(self, name: str, machine: 'StateMachine'):
        self.name = name if name else type(self).__name__
        self.machine = machine
        self.transitions = {}
        self.root = None
        self.sub_states = {}
        self.sub_state = None
        self._sub_default = None

    def add_sub_state(self, sub_state: 'State', default: bool = False):
        sub_state.root = self
        self.sub_states[sub_state.name] = sub_state
        if default:
            if self._sub_default is not None:
                raise Exception("A default sub-state already exists for {name}!".format(name=self.name))
            self._sub_default = sub_state

    def add_transition(self, event: Event, next_state: 'State'):
        self.transitions[event] = next_state

    def on_entry(self):
        self.machine.process_state_on_entry(self.name)
        if self._sub_default is not None:
            self.sub_state = self._sub_default
            self.machine.change_state(self.sub_state)

    def on_exit(self):
        if self.sub_state is not None:
            self.sub_state.on_exit()
            self.sub_state = None
        self.machine.process_state_on_exit(self.name)

    def handle_event(self, event: Event):
        if self.root is not None:
            next_state = self.root.transitions.get(event, None)
            if next_state is not None:
                self.machine.change_state(next_state)
                return

        if self.sub_state is not None:
            self.sub_state.handle_event(event)
            return

        next_state = self.transitions.get(event, None)
        if next_state is not None:
            self.machine.change_state(next_state)

    pass


class StateMachine(Actor):
    def __init__(self, name):
        Actor.__init__(self, name=name)
        self._current_state = None

    def change_state(self, state: State):
        if state.root:
            state.root.sub_state = state
        if (self._current_state is not None) and (self._current_state is not state.root):
            self._current_state.on_exit()
        self._current_state = state
        self._current_state.on_entry()

    def dispatch(self, event: Event):
        print("Event:", event.name, "on State:", self._current_state.name)
        self._current_state.handle_event(event)

    @abc.abstractmethod
    def process_state_on_entry(self, state_name: str):
        raise NotImplementedError("State machine class have to implement process_on_entry function!")

    @abc.abstractmethod
    def process_state_on_exit(self, state_name: str):
        raise NotImplementedError("State machine class have to implement process_on_exit function!")

    pass

当我尝试我的代码时,它打印如下:

” 事件:evStart 状态:空闲

状态退出:空闲

状态进入:初始化

事件:evInitialized on State:Initialization

状态退出:初始化

状态进入:可操作

状态进入:准备

事件:evPrepared 状态:Preparation

状态出口:准备

状态入口:EntryScan

事件:状态上的 evInPosition:EntryScan

状态退出:EntryScan

状态进入:ExitScan

事件:状态上的 evError:ExitScan

状态退出:ExitScan

状态条目:错误

"

https://imgur.com/a/vQ3W1Mi

我不明白为什么当错误事件发生时它不执行操作状态的 on_exit 方法。任何人都可以给我一些建议吗?

python event-handling state-machine fsm statechart
© www.soinside.com 2019 - 2024. All rights reserved.