我正在使用Python的ThreadPoolExecutor来实现异步客户端。每隔一段时间,脚本就会向线程池提交一个可调用的同步客户端。我希望能够使用 KeyboardInterrupt 来停止循环。
我编写了以下代码。
#!/usr/bin/env python
import numpy as np
import tritonclient.http as tritonclient
import argparse
import itertools
import logging
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor
distributions = {
'poisson': lambda w: random.expovariate(1/w),
'uniform': lambda w: random.uniform(0, 2*w),
}
class Client:
def __init__(self, url, model):
self.client = tritonclient.InferenceServerClient(url)
config = self.client.get_model_config(model)
self.inputs = config['input']
self.outputs = [output['name'] for output in config['output']]
self.model = model
def __call__(self):
inputs = []
for config in self.inputs:
assert config['data_type'] == 'TYPE_FP32'
shape = [1] + config['dims']
datatype = config['data_type'].removeprefix('TYPE_')
input = tritonclient.InferInput(config['name'], shape, datatype)
array = np.random.default_rng().random(shape, dtype=np.float32)
input.set_data_from_numpy(array)
inputs.append(input)
result = self.client.infer(self.model, inputs)
for output in self.outputs:
result.get_output(output)
def benchmark(fn):
t_i = time.time()
fn()
t_f = time.time()
print(t_i, t_f - t_i)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--distribution', choices=distributions.values(),
type=distributions.get, default=lambda w: w)
parser.add_argument('-n', '--nrequests', default=-1, type=int)
parser.add_argument('-o', '--open', '--open-loop', action='store_true')
parser.add_argument('-u', '--url', default='localhost:8000')
parser.add_argument('-v', '--verbose', action='count', default=0)
parser.add_argument('model')
rate = parser.add_mutually_exclusive_group()
rate.add_argument('-w', '--wait', '--delay', '-l', '--lambda',
default=0, type=float)
rate.add_argument('-r', '--rate', '-f', '--frequency', type=float)
args = parser.parse_args()
level = (logging.DEBUG if args.verbose > 1
else logging.INFO if args.verbose
else logging.WARNING)
logging.basicConfig(level=level)
if args.rate:
args.wait = 1/args.rate
logging.debug(args)
client = Client(args.url, args.model)
with ThreadPoolExecutor() as executor:
try:
for _ in (itertools.count() if args.nrequests < 0
else range(args.nrequests)):
if args.open:
executor.submit(benchmark, client)
else:
benchmark(client)
time.sleep(args.distribution(args.wait))
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
它挂在第一个 Control-C 上,并且需要另外两个才能最终退出并出现以下错误。无论如何,我今天第一次使用 ChatGPT 是为了解决这个问题。它不起作用。真是令人失望。
1717617460.23863 0.003475189208984375
1717617460.250774 0.0033867359161376953
1717617460.2690861 0.0033500194549560547
^C^CTraceback (most recent call last):
File "/data/pcoppock/mlos/apps/tritonclient", line 73, in <module>
with ThreadPoolExecutor() as executor:
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 649, in __exit__
self.shutdown(wait=True)
File "/usr/lib/python3.10/concurrent/futures/thread.py", line 235, in shutdown
t.join()
File "/usr/lib/python3.10/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt
^CException ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1537, in _shutdown
atexit_call()
File "/usr/lib/python3.10/concurrent/futures/thread.py", line 31, in _python_exit
t.join()
File "/usr/lib/python3.10/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt:
linux$
linux 发送
SIGINT
到任意线程,该线程冒泡到主线程,但是主线程必须等待所有线程结束才能终止,这就是为什么你需要第二个 Ctrl-C。
没有干净的方法来解决这个问题,您可能需要 处理 SIGINT 并设置一些布尔值来使所有线程终止,但是当您使用同步 IO 时,您无法完全中断正在进行的 IO。
唯一可靠的方法是在 Ctrl-C 发生时突然终止应用程序,方法是让信号处理程序执行 os._exit(),这会杀死应用程序而不进行任何清理。
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import signal
import os
def signal_handler(sig, frame):
# KILL THE APPLICATION RIGHT NOW !!
os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
class Client:
def __init__(self):
pass
def __call__(self):
start = time.time()
while time.time() - start < 10:
pass
def benchmark(fn):
t_i = time.time()
fn()
t_f = time.time()
print(t_i, t_f - t_i)
if __name__ == '__main__':
client = Client()
with ThreadPoolExecutor() as executor:
try:
task = executor.submit(benchmark, client)
task.result()
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
虽然以这种方式杀死 python 通常是不好的,因为没有进行清理,但另一种方法是等待所有任务完成,或者设置一个线程将定期检查的布尔值,这两种方法都不适用于您的情况。