我正在编写一个程序,该程序启动一个线程来生成“工作”并每 N 秒将其添加到队列中。然后,我有一个线程池来处理队列中的项目。
下面的程序运行得很好,直到我注释掉/删除第 #97 行(在主函数中
time.sleep(0.5)
)。一旦我这样做,它会生成一个 RuntimeError ,它试图正常停止程序(通过向主进程发送 SIGINT 或 SIGTERM )。它甚至可以在 0.1 秒这样极短的睡眠时间下正常工作,但完全没有睡眠时间就会出现问题。
我尝试研究“重入性”,但不幸的是它有点超出了我的理解范围。
谁能帮助我理解这一点?
代码:
import random
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty, Queue, SimpleQueue
from typing import Any
class UniqueQueue:
"""
A thread safe queue which can only ever contain unique items.
"""
def __init__(self) -> None:
self._q = Queue()
self._items = []
self._l = threading.Lock()
def get(self, block: bool = False, timeout: float | None = None) -> Any:
with self._l:
try:
item = self._q.get(block=block, timeout=timeout)
except Empty:
raise
else:
self._items.pop(0)
return item
def put(self, item: Any, block: bool = False, timeout: float | None = None) -> None:
with self._l:
if item in self._items:
return None
self._items.append(item)
self._q.put(item, block=block, timeout=timeout)
def size(self) -> int:
return self._q.qsize()
def empty(self) -> bool:
return self._q.empty()
def stop_app(sig_num, sig_frame) -> None:
# global stop_app_event
print("Signal received to stop the app")
stop_app_event.set()
def work_generator(q: UniqueQueue) -> None:
last_execution = time.time()
is_first_execution = True
while not stop_app_event.is_set():
elapsed_seconds = int(time.time() - last_execution)
if elapsed_seconds <= 10 and not is_first_execution:
time.sleep(0.5)
continue
last_execution = time.time()
is_first_execution = False
print("Generating work...")
for _ in range(100):
q.put({"n": random.randint(0, 500)})
def print_work(w) -> None:
print(f"{datetime.now()}: {w}")
def main():
# Create a work queue
work_queue = UniqueQueue()
# Create a thread to generate the work and add to the queue
t = threading.Thread(target=work_generator, args=(work_queue,))
t.start()
# Create a thread pool, get work from the queue, and submit to the pool for processing
pool = ThreadPoolExecutor(max_workers=20)
futures: list[Future] = []
while True:
print("Processing work...")
if stop_app_event.is_set():
print("stop_app_event is set:", stop_app_event.is_set())
for future in futures:
future.cancel()
break
print("Queue Size:", work_queue.size())
try:
while not work_queue.empty():
work = work_queue.get()
future = pool.submit(print_work, work)
futures.append(future)
except Empty:
pass
time.sleep(0.5)
print("Stopping the work generator thread...")
t.join(timeout=10)
print("Work generator stopped")
print("Stopping the thread pool...")
pool.shutdown(wait=True)
print("Thread pool stopped")
if __name__ == "__main__":
stop_app_event = threading.Event()
signal.signal(signalnum=signal.SIGINT, handler=stop_app)
signal.signal(signalnum=signal.SIGTERM, handler=stop_app)
main()
这是因为你在信号处理程序中调用了
print()
,stop_app()
。
C 中信号处理程序在后台线程中执行,但在 Python 中它在主线程中执行。 (参见参考。)在您的情况下,在执行
print()
调用时,另一个 print()
被调用,术语“可重入”在这里非常适合。并且当前的IO堆栈禁止重入调用。(如果您有兴趣,请参阅实现。)
您可以使用
os.write()
和 sys.stdout
来解决此问题,如下所示。
import sys
import os
...
def stop_app(sig_num, sig_frame):
os.write(sys.stdout.fileno(), b"Signal received to stop the app\n")
stop_app_event.set()