我有一个涉及多个线程的复杂应用程序,同时使用
QThreads
(来自 PyQt/PySide)和 Python 的 threading
模块。复杂性要么源于问题的本质,要么源于我解决问题的方法。
该应用程序有一个带有“停止”按钮的用户界面,我需要一种方法来在按下此按钮时立即停止所有正在运行的线程。我想避免添加大量停止标志并在所有嵌套线程中管理它们的状态。目标是允许用户通过单个操作停止所有操作,以便他们可以根据需要进行更改并重新启动进程。
有没有一种方法或途径可以实现这种线程瞬时终止?
您的线程将需要监视某种停止标志,但正如 @SolomonSlow 在评论中指出的那样,您只需要一个标志。他们都可以观看同一部。
考虑以下代码;它启动任意数量的线程(默认为 5 个),所有线程都监视同一个
threading.Event
对象;设置对象将导致所有线程退出:
import argparse
import time
import logging
import random
import threading
LOG = logging.getLogger(__name__)
class Worker(threading.Thread):
def __init__(self, jobId, stopFlag):
self.jobId = jobId
self.stopFlag = stopFlag
super().__init__()
def run(self):
LOG.info("starting job %s", self.jobId)
while not self.stopFlag.wait(timeout=random.randint(1, 10)):
LOG.info("doing some work in job %s", self.jobId)
LOG.info("all done with job %s", self.jobId)
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("-n", "--num-threads", type=int, default=5)
return p.parse_args()
def main():
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(message)s", datefmt="%H:%M:%S"
)
args = parse_args()
stopFlag = threading.Event()
tasks = []
for i in range(args.num_threads):
t = Worker(i, stopFlag)
tasks.append(t)
t.start()
time.sleep(30)
LOG.info("sending stop signal")
stopFlag.set()
for t in tasks:
t.join()
LOG.info("all done")
if __name__ == "__main__":
main()
运行这个看起来像:
10:45:28 starting job 0
10:45:28 starting job 1
10:45:28 starting job 2
10:45:28 starting job 3
10:45:28 starting job 4
10:45:29 doing some work in job 1
10:45:29 doing some work in job 4
10:45:32 doing some work in job 3
10:45:33 doing some work in job 0
10:45:35 doing some work in job 0
10:45:38 doing some work in job 2
10:45:38 doing some work in job 1
.
.
.
10:45:52 doing some work in job 1
10:45:55 doing some work in job 4
10:45:57 doing some work in job 2
10:45:58 sending stop signal
10:45:58 all done with job 3
10:45:58 all done with job 2
10:45:58 all done with job 4
10:45:58 all done with job 1
10:45:58 all done with job 0
10:45:58 all done
我需要一种方法来在按下此按钮时立即停止所有正在运行的线程
“立即”很棘手,并且实际上取决于线程中的代码是什么样的。在此处的示例中,线程几乎立即退出,但这是因为它们花费大量时间只是等待停止标志。如果您的线程在检查停止标志之间执行大量工作,这将影响您按下“停止”按钮时它们响应所需的时间。
如果您的线程花费大量时间等待文件描述符,那么您可能只想创建管道并在希望停止时关闭它,而不是使用
threading.Event
对象。然后,您可以在用于处理文件或网络 I/O 的同一个 select
/poll
循环中观看它:
import os
import argparse
import select
import time
import logging
import random
import threading
LOG = logging.getLogger(__name__)
class Worker(threading.Thread):
def __init__(self, jobId, stopFlag):
self.jobId = jobId
self.stopFlag = stopFlag
super().__init__()
def run(self):
LOG.info("starting job %s", self.jobId)
poll = select.poll()
poll.register(self.stopFlag, select.POLLHUP)
while True:
events = poll.poll(1000 * random.randint(1, 10))
for fd, event in events:
if fd == self.stopFlag:
break
else:
LOG.info("doing some work in job %s", self.jobId)
continue
break
LOG.info("all done with job %s", self.jobId)
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("-n", "--num-threads", type=int, default=5)
p.add_argument("-w", "--wait", type=int, default=30)
return p.parse_args()
def main():
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(message)s", datefmt="%H:%M:%S"
)
args = parse_args()
stopFlag = os.pipe()
tasks = []
for i in range(args.num_threads):
t = Worker(i, stopFlag[0])
tasks.append(t)
t.start()
time.sleep(args.wait)
LOG.info("sending stop signal")
os.close(stopFlag[1])
for t in tasks:
t.join()
LOG.info("all done")
if __name__ == "__main__":
main()