多线程卡在最后的未来

问题描述 投票:0回答:1
def download_files_from_folder(base_url, folder_name):
    folder_url = f"{base_url}{folder_name}/"
    response = requests.get(folder_url)
    soup = BeautifulSoup(response.content, "html.parser")
    links = soup.find_all("a")
    peninsular_rain_rates = []
    east_rain_rates = []
    completed_urls = []
    with ThreadPoolExecutor(max_workers=15) as executor:
        futures = {executor.submit(process_gz_file, folder_url + link.get("href")): link.get("href") for                                     link in links if link.get("href").endswith(".gz")}
        for future in as_completed(futures):
            peninsular, east, completed = future.result()
            peninsular_rain_rates.extend(peninsular)
            east_rain_rates.extend(east)
            completed_urls.extend(completed)
            futures.pop(future)
            print(len(futures))

在此代码中,我尝试从网站获取

.gz
文件(总共约 8000 个文件,每个约 1.5 MB)的 URL 链接进行处理,并最终将结果扩展到数组。但问题是代码似乎卡在最后一个 future 上,这是通过在完成时弹出每个 future 并打印剩余 future 字典的长度来知道的。我尝试过使用较少数量的文件(例如 50 个链接),并且运行良好。

这可能是内存问题吗?

python multithreading concurrent.futures
1个回答
0
投票

首先,我不确定为什么你要制作

futures
字典,因为你没有使用方法
Future
返回的
as_completed
实例来索引字典。如果您确实觉得需要从
futures
中删除那些代表已完成任务的实例,并且正如 SIGUP 所建议的那样,这会导致问题,那么不要使用
as_completed
,而是使用回调,如以下模板所示:

from concurrent.futures import ThreadPoolExecutor

def worker(n):
    import time

    # emulate work
    time.sleep(1)
    return n

def submit_tasks():
    results = []
    futures = set()

    def done_callback(future):
        futures.remove(future)
        print(len(futures))
        results.append(future.result())

    ...
    with ThreadPoolExecutor(max_workers=15) as executor:
        # Note that I am not iterating futures concurrently with
        # members being removed from it, which would cause an error:
        for n in range(100):
            future = executor.submit(worker, n)
            futures.add(future)
            future.add_done_callback(done_callback)
    # There is an implicit call to executor.shutdown(wait=True) here:
    # By time we reach here, the results list will have been completely filled in
    # and the length of futures will be 0
    assert len(results) == 100
    assert len(futures) == 0

submit_tasks()

也许更简单的方法是使用带有

multiprocessing.pool.ThreadPool
imap
imap_unordered
实例,具体取决于您是否希望
results
列表包含按任务提交顺序排列的结果:

from multiprocessing.pool import ThreadPool

def worker(n):
    import time

    # emulate work
    time.sleep(1)
    return n

def submit_tasks():
    results = []

    # Here we could just use: tasks = range(100)
    # But if you have something more complicated, then use a
    # generator expression or generator function for space efficiency
    # as follows:
    tasks = (n for n in range(100))
    tasks_left_to_complete = 100

    with ThreadPool(15) as pool:
        for result in pool.imap_unordered(worker, tasks):
            results.append(result)
            tasks_left_to_complete -= 1
            print(tasks_left_to_complete)

submit_tasks()
© www.soinside.com 2019 - 2024. All rights reserved.