我正在开发一个应用程序,用于从彼此绑定的不同服务器获取数据。 基本上,每个服务器都有一个主要区域,如美洲或亚洲,以及子区域,如北美、巴西。每个服务器都有自己的有界子区域。
主要区域和子区域不共享方法,所以我想并行运行。 我的基本方法是使用 executor.map() 为每个区域创建一个线程,然后在每个新线程内部为细分创建线程。
所有线程都应该无限期且独立地运行。
代码是这样的:
def main_region_fetching(region_list):
cur_region = next(iter(region_list.keys()))
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(region_list["cur_region"])
) as executor:
for region in region_list[cur_region]:
executor.submit(sub_region_fetching, region)
# The main region code should continue in here, but it doesn't reach.
with concurrent.futures.ThreadPoolExecutor(max_workers=len(regions)) as executor:
executor.map(main_region_fetching, regions)
sub_region_fetching 会阻塞主线程的继续。它是否等待线程的完成?我可以让它运行线程而不阻塞主线程吗?因为它应该无限期地运行。 还有其他更好的选择吗?例如使用异步。
executor.map
不是一个阻塞函数,但你需要通过消耗返回的迭代器来等待它结束,否则它将取消任何未启动的任务,错误也会被忽略,你不会知道你的应用程序无法运行因为一个错误。
如果您不想阻塞主线程,那么只需创建一个新的
threading.thread
来管理这些子线程。
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
def per_worker_function(worker_id):
print(f"worker: {worker_id}")
def workers_manager_function():
with ThreadPoolExecutor(max_workers=5) as executor:
# list properly consumes the returned iterator
list(executor.map(per_worker_function, range(5)))
manager_thread = Thread(target=workers_manager_function)
manager_thread.start()
print("main thread doing work")
# destructor gets called when manager_thread is out of scope
# joining it and waiting for the work to be done.
main thread doing work
worker: 0
worker: 1
worker: 2
worker: 3
worker: 4