如何并发使用ThreadPoolExecutor和无限循环

问题描述 投票:0回答:1

我正在开发一个应用程序,用于从彼此绑定的不同服务器获取数据。 基本上,每个服务器都有一个主要区域,如美洲或亚洲,以及子区域,如北美、巴西。每个服务器都有自己的有界子区域。

主要区域和子区域不共享方法,所以我想并行运行。 我的基本方法是使用 executor.map() 为每个区域创建一个线程,然后在每个新线程内部为细分创建线程。

所有线程都应该无限期且独立地运行。

代码是这样的:

def main_region_fetching(region_list):
    cur_region = next(iter(region_list.keys()))
    with concurrent.futures.ThreadPoolExecutor(
            max_workers=len(region_list["cur_region"])
        ) as executor:
            for region in region_list[cur_region]:
                executor.submit(sub_region_fetching, region)

    # The main region code should continue in here, but it doesn't reach.

with concurrent.futures.ThreadPoolExecutor(max_workers=len(regions)) as executor:
    executor.map(main_region_fetching, regions)

sub_region_fetching 会阻塞主线程的继续。它是否等待线程的完成?我可以让它运行线程而不阻塞主线程吗?因为它应该无限期地运行。 还有其他更好的选择吗?例如使用异步。

python multithreading concurrency
1个回答
0
投票

executor.map
不是一个阻塞函数,但你需要通过消耗返回的迭代器来等待它结束,否则它将取消任何未启动的任务,错误也会被忽略,你不会知道你的应用程序无法运行因为一个错误。

如果您不想阻塞主线程,那么只需创建一个新的

threading.thread
来管理这些子线程。

from concurrent.futures import ThreadPoolExecutor
from threading import Thread
def per_worker_function(worker_id):
    print(f"worker: {worker_id}")

def workers_manager_function():
    with ThreadPoolExecutor(max_workers=5) as executor:
        # list properly consumes the returned iterator
        list(executor.map(per_worker_function, range(5)))

manager_thread = Thread(target=workers_manager_function)
manager_thread.start()

print("main thread doing work")

# destructor gets called when manager_thread is out of scope 
# joining it and waiting for the work to be done.
main thread doing work
worker: 0
worker: 1
worker: 2
worker: 3
worker: 4
© www.soinside.com 2019 - 2024. All rights reserved.