如何在Python中动态添加线程或从线程池中选择线程

问题描述 投票:0回答:1

我希望安排线程执行的任务,并能够动态地在进程中添加或删除任务。我的目标是预先创建一个线程池,例如 [th1、th2、th3],然后通过对我的系统的 API 调用将任务(例如 t1、t2、t3)分配给这些线程。

当我收到像 t1 这样的任务时,我想从池中选择一个随机线程并将该任务分配给它。 到目前为止,我已经在 Python 中尝试了 ThreadPoolExecutor,但它似乎没有提供一种从已运行的进程中动态添加或删除任务的方法。

注意:我正在处理的任务是持续运行的事件侦听器,直到我决定停止侦听事件总线。因此,我认为拥有一个可以从中选择和分配事件侦听器任务的线程池将是有益的。


def run(task_arg):
    # core logic of the listener which continuously listens for the 
    # events

def using_executor():
    argument_dicts = [
        {"topic": "/captureSaveEvent"},
    ]
    logging.basicConfig()

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = {executor.submit(run, arg): arg for arg in argument_dicts}
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Thread for {futures[future]} failed with exception: {e}")

在上面我已经定义了要在

argument_dicts
中提交的任务,但我想通过添加新线程或选择线程池中的线程并将任务分配给它来将任务添加到同一进程,我不这样做不想创建一个新进程并为我想添加的每个新任务创建新线程,因为该任务是不会退出的侦听器事件,因此只要我退出,创建线程的进程将继续处于活动状态侦听器或侦听器逻辑中的某些问题。

我期望的是,我需要知道进程 id,并且使用该 pid,我应该能够从该进程中选择一个线程,该线程可以自由地执行新任务并向其提交作业。

python threadpool python-multithreading concurrent.futures
1个回答
0
投票

此答案基于我从您的描述中收集到的以下要求:

  1. 用于运行任务的线程池
  2. 任务应连续运行,直到被命令停止
  3. 线程池可以接受命令来运行/停止任务

解决方案将涉及创建一个任务管理器线程来处理线程池并接受/删除任务。

from threading import Event, Lock, Thread

def task_func(arg, stop_event):
  while not stop_event.is_set():
    # core logic of the listener which continuously listens for the 
    # events

class TaskManager(Thread):
  def __init__(self, workers_count: int = 20):
    super().__init__(daemon=True)

    self.workers_count = workers_count

    self.__new_tasks = []
    self.__lock = Lock()
    self.__task_stop_events = {}
    self.__stop_event = Event()

  def run(self):
    with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers_count) as executor:
      while not self.__stop_event.is_set():
        with self.__lock:
          for task in self.__new_tasks:
            stop_event = Event()
            executor.submit(task_func, task[1], stop_event)
            self.__task_stop_events[task[0]] = stop_event

          self.__new_tasks = []
        
        time.sleep(0.001)  # prevent task manager from hogging resource

  # add new task to run
  def start_task(self, task_id, task_arg):
    with self.__lock:
      self.__new_tasks.append((task_id, task_arg))

  # stop currently running task
  def stop_task(self, task_id):
    with self.__lock:
      if task_id in self.__task_stop_events:
        self.__task_stop_events[task_id].set()
        del self.__task_stop_events[task_id]
          
  def stop(self):
    self.__stop_event.set()

任务管理器的使用示例:

tm = TaskManager()
tm.start()  # start the task manager thread

...

tm.start_task('task_1', {"topic": "/captureSaveEvent"}) # add a task

...

tm.stop_task('task_1') # stop a task

...

tm.stop()  # stop task manager thread
© www.soinside.com 2019 - 2024. All rights reserved.