我在使用 aiohttp 发出异步 HTTP 请求的 Django 应用程序中遇到内存泄漏问题。我已经设置了一个函数来从 REST API 获取数据,但是尽管我尝试有效地管理内存,但我观察到处理每批请求时内存使用量显着增加。
这是我的代码的简化版本:
import logging
from django.http import JsonResponse
import aiohttp
import asyncio
import gc
from memory_profiler import profile
logger = logging.getLogger(__name__)
async def fetch(session, url):
async with session.get(url, ssl=False) as response:
response.raise_for_status()
return await response.json()
@profile
async def main():
URL = "https://jsonplaceholder.typicode.com/posts"
REQUEST_COUNT = 1000
CONCURRENT_LIMIT = 50
BATCH_SIZE = 100
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)) as session:
semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
async def fetch_with_limit():
async with semaphore:
return await fetch(session, URL)
for i in range(0, REQUEST_COUNT, BATCH_SIZE):
tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
results = await asyncio.gather(*tasks)
logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
tasks.clear()
await session.close()
del session
gc.collect()
return
async def test_httpx_view(request):
asyncio.create_task(main())
return JsonResponse({'status': 'Success'}, status=200)
我已经在 main() 函数上运行了内存分析,结果表明每个批次处理后内存显着增加:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
288 171.8 MiB 171.8 MiB 1 @profile
289 async def main():
290 171.8 MiB 0.0 MiB 1 URL = "https://jsonplaceholder.typicode.com/posts"
291 171.8 MiB 0.0 MiB 1 REQUEST_COUNT = 1000
292 171.8 MiB 0.0 MiB 1 CONCURRENT_LIMIT = 50
293 171.8 MiB 0.0 MiB 1 BATCH_SIZE = 100
294
295 # Initialize local session to ensure closure after each batch
296 171.8 MiB 0.0 MiB 2 async with aiohttp.ClientSession(
297 171.8 MiB 0.0 MiB 1 connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298 171.8 MiB 0.0 MiB 1 ) as session:
299 171.8 MiB 0.0 MiB 1 semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301 191.0 MiB -18.9 MiB 1001 async def fetch_with_limit():
302 191.0 MiB -14.6 MiB 1500 async with semaphore:
303 191.0 MiB -101.7 MiB 5191 return await fetch(session, URL)
304
305 191.0 MiB 0.0 MiB 12 for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306 191.0 MiB 0.0 MiB 1030 tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307 191.0 MiB -0.3 MiB 20 results = await asyncio.gather(*tasks)
308
309 # Here you could process the results from each batch if needed
310 191.0 MiB 0.0 MiB 10 logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312 # Clean up after each batch
313 191.0 MiB 0.0 MiB 10 tasks.clear()
314
315 191.0 MiB 0.0 MiB 1 await session.close()
316 191.0 MiB 0.0 MiB 1 del session
317 191.0 MiB 0.0 MiB 1 gc.collect()
318 191.0 MiB 0.0 MiB 1 return
INFO: 127.0.0.1:41392 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
Line # Mem usage Increment Occurrences Line Contents
=============================================================
288 190.8 MiB 190.8 MiB 1 @profile
289 async def main():
290 190.8 MiB 0.0 MiB 1 URL = "https://jsonplaceholder.typicode.com/posts"
291 190.8 MiB 0.0 MiB 1 REQUEST_COUNT = 1000
292 190.8 MiB 0.0 MiB 1 CONCURRENT_LIMIT = 50
293 190.8 MiB 0.0 MiB 1 BATCH_SIZE = 100
294
295 # Initialize local session to ensure closure after each batch
296 190.8 MiB 0.0 MiB 2 async with aiohttp.ClientSession(
297 190.8 MiB 0.0 MiB 1 connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298 190.8 MiB 0.0 MiB 1 ) as session:
299 190.8 MiB 0.0 MiB 1 semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301 191.4 MiB 0.0 MiB 1001 async def fetch_with_limit():
302 191.4 MiB 0.1 MiB 1500 async with semaphore:
303 191.4 MiB 0.5 MiB 5134 return await fetch(session, URL)
304
305 191.4 MiB 0.0 MiB 12 for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306 191.4 MiB 0.0 MiB 1030 tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307 191.4 MiB 0.0 MiB 20 results = await asyncio.gather(*tasks)
308
309 # Here you could process the results from each batch if needed
310 191.4 MiB 0.0 MiB 10 logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312 # Clean up after each batch
313 191.4 MiB 0.0 MiB 10 tasks.clear()
314
315 191.4 MiB 0.0 MiB 1 await session.close()
316 191.4 MiB 0.0 MiB 1 del session
317 191.4 MiB 0.0 MiB 1 gc.collect()
318 191.4 MiB 0.0 MiB 1 return
INFO: 127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
Line # Mem usage Increment Occurrences Line Contents
=============================================================
288 191.4 MiB 191.4 MiB 2 @profile
289 async def main():
290 191.4 MiB 0.0 MiB 2 URL = "https://jsonplaceholder.typicode.com/posts"
291 191.4 MiB 0.0 MiB 2 REQUEST_COUNT = 1000
292 191.4 MiB 0.0 MiB 2 CONCURRENT_LIMIT = 50
293 191.4 MiB 0.0 MiB 2 BATCH_SIZE = 100
294
295 # Initialize local session to ensure closure after each batch
296 191.4 MiB 0.0 MiB 4 async with aiohttp.ClientSession(
297 191.4 MiB 0.0 MiB 2 connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
298 191.4 MiB 0.0 MiB 2 ) as session:
299 191.4 MiB 0.0 MiB 2 semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
300
301 206.5 MiB -72.7 MiB 1902 async def fetch_with_limit():
302 206.5 MiB -107.7 MiB 2850 async with semaphore:
303 206.5 MiB -364.5 MiB 9791 return await fetch(session, URL)
304
305 206.5 MiB -0.7 MiB 21 for i in range(0, REQUEST_COUNT, BATCH_SIZE):
306 206.5 MiB -74.8 MiB 1957 tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
307 206.5 MiB -1.5 MiB 37 results = await asyncio.gather(*tasks)
308
309 # Here you could process the results from each batch if needed
310 206.5 MiB -0.7 MiB 18 logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
311
312 # Clean up after each batch
313 206.5 MiB -0.7 MiB 18 tasks.clear()
314
315 206.5 MiB 0.0 MiB 1 await session.close()
316 206.5 MiB 0.0 MiB 1 del session
317 206.5 MiB 0.0 MiB 1 gc.collect()
318 206.5 MiB 0.0 MiB 1 return
内存使用量开始时约为 171.8 MiB,但随着处理的每个批次而增加。 我已明确包含垃圾收集,但内存使用量仍在继续上升。 async with 语句确保 ClientSession 在每个批次后关闭,但它似乎并不能有效地释放内存。
我尝试过使用 Python 版本 3.12.3、3.10.15 和 3.9.2。对于 Python 3.9.2,使用以下库版本:
aiohttp:版本 3.10.10 多字典:版本 6.1.0 亚尔:版本1.17.1 async_timeout:版本 4.0.3
问题 是什么可能导致此设置中的内存泄漏? 我可以采取任何其他步骤来确保在异步上下文中使用 aiohttp 进行正确的内存管理吗?
fetch_with_limit
移动到主体。每次运行 main 时,它都会创建一个新的 fetch_with_limit
,并且永远不会被删除。我猜这就是为什么你会在那时看到跳跃。gc.collect()
,以便最好地理解为什么内存没有被释放。