def download_files_from_folder(base_url, folder_name):
folder_url = f"{base_url}{folder_name}/"
response = requests.get(folder_url)
soup = BeautifulSoup(response.content, "html.parser")
links = soup.find_all("a")
peninsular_rain_rates = []
east_rain_rates = []
completed_urls = []
with ThreadPoolExecutor(max_workers=15) as executor:
futures = {executor.submit(process_gz_file, folder_url + link.get("href")): link.get("href") for link in links if link.get("href").endswith(".gz")}
for future in as_completed(futures):
peninsular, east, completed = future.result()
peninsular_rain_rates.extend(peninsular)
east_rain_rates.extend(east)
completed_urls.extend(completed)
futures.pop(future)
print(len(futures))
在此代码中,我尝试从网站获取
.gz
文件(总共约 8000 个文件,每个约 1.5 MB)的 URL 链接进行处理,并最终将结果扩展到数组。但问题是代码似乎卡在最后一个 future 上,这是通过在完成时弹出每个 future 并打印剩余 future 字典的长度来知道的。我尝试过使用较少数量的文件(例如 50 个链接),并且运行良好。
这可能是内存问题吗?
首先,我不确定为什么你要制作
futures
字典,因为你没有使用方法 Future
返回的 as_completed
实例来索引字典。如果您确实觉得需要从 futures
中删除那些代表已完成任务的实例,并且正如 SIGUP 所建议的那样,这会导致问题,那么不要使用 as_completed
,而是使用回调,如以下模板所示:
from concurrent.futures import ThreadPoolExecutor
def worker(n):
import time
# emulate work
time.sleep(1)
return n
def submit_tasks():
results = []
futures = set()
def done_callback(future):
futures.remove(future)
print(len(futures))
results.append(future.result())
...
with ThreadPoolExecutor(max_workers=15) as executor:
# Note that I am not iterating futures concurrently with
# members being removed from it, which would cause an error:
for n in range(100):
future = executor.submit(worker, n)
futures.add(future)
future.add_done_callback(done_callback)
# There is an implicit call to executor.shutdown(wait=True) here:
# By time we reach here, the results list will have been completely filled in
# and the length of futures will be 0
assert len(results) == 100
assert len(futures) == 0
submit_tasks()
也许更简单的方法是使用带有
multiprocessing.pool.ThreadPool
或 imap
的 imap_unordered
实例,具体取决于您是否希望 results
列表包含按任务提交顺序排列的结果:
from multiprocessing.pool import ThreadPool
def worker(n):
import time
# emulate work
time.sleep(1)
return n
def submit_tasks():
results = []
# Here we could just use: tasks = range(100)
# But if you have something more complicated, then use a
# generator expression or generator function for space efficiency
# as follows:
tasks = (n for n in range(100))
tasks_left_to_complete = 100
with ThreadPool(15) as pool:
for result in pool.imap_unordered(worker, tasks):
results.append(result)
tasks_left_to_complete -= 1
print(tasks_left_to_complete)
submit_tasks()