我正在使用 Python 的
ThreadPoolExecutor
在不同线程上同时执行一个函数。执行的函数返回一个枚举PytestResult
,然后我记录返回的结果并将其附加到数据库中:
with concurrent.futures.ThreadPoolExecutor(max_workers=len(self.vm_pool)) as executor:
futures = {executor.submit(
self.execute_test, test): test for test in self.test_pool}
for future in concurrent.futures.as_completed(futures):
test = futures[future]
failure_reason = None
try:
result = future.result()
if result == PytestResult.PASSED:
logging.info(f'Test {test} passed')
status = TestStatus.PASSED
else:
logging.error(
f'Test {test} failed: {PytestResult[result]}')
status = TestStatus.FAILED
except Exception as e:
logging.error(f'Test {test} failed with exception: {e}')
status = TestStatus.EXCEPTION
failure_reason = str(e)
finally:
logging.info(f'Updating test "{test}" status to "{status}"')
database.update_test_status(self.pipeline, test.id, status, failure_reason)
我想检查所有的 future 是否都返回了成功的结果,所以我这样做了:
return all(future.result() == PytestResult.PASSED for future in futures)
但是,我不确定如果结果已经缓存/计算,第二次调用
future.result()
是否是一个好的做法。我知道我可以在列表中跟踪已通过的测试,但我想做一个“一行”来轻松检查所有测试是否都成功。
您可以多次拨打
Future.result()
。
完成后,结果存储在
Future
对象上,使用 .result()
多次检索它不会重新计算它或执行任何有意义的工作。
如果您对其内部结构感兴趣,
concurrent.futures.Future
源代码非常容易阅读:
https://github.com/python/cpython/blob/3.12/Lib/concurrent/futures/_base.py#L428