使用键盘中断关闭ThreadPoolExecutor

问题描述 投票:0回答:1

我正在使用Python的ThreadPoolExecutor来实现异步客户端。每隔一段时间,脚本就会向线程池提交一个可调用的同步客户端。我希望能够使用 KeyboardInterrupt 来停止循环。

我编写了以下代码。

#!/usr/bin/env python

import numpy as np
import tritonclient.http as tritonclient

import argparse
import itertools
import logging
import random
import sys
import time
from concurrent.futures import ThreadPoolExecutor

distributions = {
    'poisson': lambda w: random.expovariate(1/w),
    'uniform': lambda w: random.uniform(0, 2*w),
}

class Client:
    def __init__(self, url, model):
        self.client = tritonclient.InferenceServerClient(url)
        config = self.client.get_model_config(model)
        self.inputs = config['input']
        self.outputs = [output['name'] for output in config['output']]
        self.model = model

    def __call__(self):
        inputs = []
        for config in self.inputs:
            assert config['data_type'] == 'TYPE_FP32'
            shape = [1] + config['dims']
            datatype = config['data_type'].removeprefix('TYPE_')
            input = tritonclient.InferInput(config['name'], shape, datatype)
            array = np.random.default_rng().random(shape, dtype=np.float32)
            input.set_data_from_numpy(array)
            inputs.append(input)
        result = self.client.infer(self.model, inputs)
        for output in self.outputs:
            result.get_output(output)

def benchmark(fn):
    t_i = time.time()
    fn()
    t_f = time.time()
    print(t_i, t_f - t_i)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', '--distribution', choices=distributions.values(),
                        type=distributions.get, default=lambda w: w)
    parser.add_argument('-n', '--nrequests', default=-1, type=int)
    parser.add_argument('-o', '--open', '--open-loop', action='store_true')
    parser.add_argument('-u', '--url', default='localhost:8000')
    parser.add_argument('-v', '--verbose', action='count', default=0)
    parser.add_argument('model')
    rate = parser.add_mutually_exclusive_group()
    rate.add_argument('-w', '--wait', '--delay', '-l', '--lambda',
                      default=0, type=float)
    rate.add_argument('-r', '--rate', '-f', '--frequency', type=float)
    args = parser.parse_args()

    level = (logging.DEBUG if args.verbose > 1
            else logging.INFO if args.verbose
            else logging.WARNING)
    logging.basicConfig(level=level)

    if args.rate:
        args.wait = 1/args.rate
    logging.debug(args)

    client = Client(args.url, args.model)

    with ThreadPoolExecutor() as executor:
        try:
            for _ in (itertools.count() if args.nrequests < 0
                        else range(args.nrequests)):
                if args.open:
                    executor.submit(benchmark, client)
                else:
                    benchmark(client)
                time.sleep(args.distribution(args.wait))
        except KeyboardInterrupt:
            pass
        except BrokenPipeError:
            pass

它挂在第一个 Control-C 上,并且需要另外两个才能最终退出并出现以下错误。无论如何,我今天第一次使用 ChatGPT 是为了解决这个问题。它不起作用。真是令人失望。

1717617460.23863 0.003475189208984375
1717617460.250774 0.0033867359161376953
1717617460.2690861 0.0033500194549560547
^C^CTraceback (most recent call last):
  File "/data/pcoppock/mlos/apps/tritonclient", line 73, in <module>
    with ThreadPoolExecutor() as executor:
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 649, in __exit__
    self.shutdown(wait=True)
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 235, in shutdown
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt
^CException ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1537, in _shutdown
    atexit_call()
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 31, in _python_exit
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.10/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt: 

linux$ 
python multithreading networking signals
1个回答
0
投票

linux 发送

SIGINT
到任意线程,该线程冒泡到主线程,但是主线程必须等待所有线程结束才能终止,这就是为什么你需要第二个 Ctrl-C。

没有干净的方法来解决这个问题,您可能需要 处理 SIGINT 并设置一些布尔值来使所有线程终止,但是当您使用同步 IO 时,您无法完全中断正在进行的 IO。

唯一可靠的方法是在 Ctrl-C 发生时突然终止应用程序,方法是让信号处理程序执行 os._exit(),这会杀死应用程序而不进行任何清理。

import sys
import time
from concurrent.futures import ThreadPoolExecutor
import signal
import os

def signal_handler(sig, frame):
    # KILL THE APPLICATION RIGHT NOW !!
    os._exit(0)

signal.signal(signal.SIGINT, signal_handler)

class Client:
    def __init__(self):
        pass

    def __call__(self):
        start = time.time()
        while time.time() - start < 10:
            pass

def benchmark(fn):
    t_i = time.time()
    fn()
    t_f = time.time()
    print(t_i, t_f - t_i)

if __name__ == '__main__':
    client = Client()
    with ThreadPoolExecutor() as executor:
        try:
            task = executor.submit(benchmark, client)
            task.result()
        except KeyboardInterrupt:
            pass
        except BrokenPipeError:
            pass

虽然以这种方式杀死 python 通常是不好的,因为没有进行清理,但另一种方法是等待所有任务完成,或者设置一个线程将定期检查的布尔值,这两种方法都不适用于您的情况。

© www.soinside.com 2019 - 2024. All rights reserved.