Django 中使用 aiohttp 的异步 HTTP 请求中的内存泄漏

问题描述 投票:0回答:1

我在使用 aiohttp 发出异步 HTTP 请求的 Django 应用程序中遇到内存泄漏问题。我已经设置了一个函数来从 REST API 获取数据,但是尽管我尝试有效地管理内存,但我观察到处理每批请求时内存使用量显着增加。

这是我的代码的简化版本:

    import logging
    from django.http import JsonResponse
    import aiohttp
    import asyncio
    import gc
    from memory_profiler import profile

    logger = logging.getLogger(__name__)

    async def fetch(session, url):
        async with session.get(url, ssl=False) as response:
            response.raise_for_status()
            return await response.json()

    @profile
    async def main():
        URL = "https://jsonplaceholder.typicode.com/posts"
        REQUEST_COUNT = 1000
        CONCURRENT_LIMIT = 50  
        BATCH_SIZE = 100

        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)) as session:
            semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)

            async def fetch_with_limit():
                async with semaphore:
                    return await fetch(session, URL)

            for i in range(0, REQUEST_COUNT, BATCH_SIZE):
                tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
                results = await asyncio.gather(*tasks)
                logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
                tasks.clear()

        await session.close()
        del session
        gc.collect()
        return

    async def test_httpx_view(request):
        asyncio.create_task(main())
        return JsonResponse({'status': 'Success'}, status=200)

分析结果

我已经在 main() 函数上运行了内存分析,结果表明每个批次处理后内存显着增加:


    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
    288    171.8 MiB    171.8 MiB           1   @profile
    289                                         async def main():
    290    171.8 MiB      0.0 MiB           1       URL = "https://jsonplaceholder.typicode.com/posts"
    291    171.8 MiB      0.0 MiB           1       REQUEST_COUNT = 1000
    292    171.8 MiB      0.0 MiB           1       CONCURRENT_LIMIT = 50  
    293    171.8 MiB      0.0 MiB           1       BATCH_SIZE = 100
    294                                         
    295                                             # Initialize local session to ensure closure after each batch
    296    171.8 MiB      0.0 MiB           2       async with aiohttp.ClientSession(
    297    171.8 MiB      0.0 MiB           1           connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
    298    171.8 MiB      0.0 MiB           1       ) as session:
    299    171.8 MiB      0.0 MiB           1           semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
    300                                         
    301    191.0 MiB    -18.9 MiB        1001           async def fetch_with_limit():
    302    191.0 MiB    -14.6 MiB        1500               async with semaphore:
    303    191.0 MiB   -101.7 MiB        5191                   return await fetch(session, URL)
    304                                         
    305    191.0 MiB      0.0 MiB          12           for i in range(0, REQUEST_COUNT, BATCH_SIZE):
    306    191.0 MiB      0.0 MiB        1030               tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
    307    191.0 MiB     -0.3 MiB          20               results = await asyncio.gather(*tasks)
    308                                         
    309                                                     # Here you could process the results from each batch if needed
    310    191.0 MiB      0.0 MiB          10               logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
    311                                         
    312                                                     # Clean up after each batch
    313    191.0 MiB      0.0 MiB          10               tasks.clear()
    314                                         
    315    191.0 MiB      0.0 MiB           1       await session.close()
    316    191.0 MiB      0.0 MiB           1       del session
    317    191.0 MiB      0.0 MiB           1       gc.collect()
    318    191.0 MiB      0.0 MiB           1       return


    INFO:     127.0.0.1:41392 - "GET /test_aiohttp/ HTTP/1.1" 200 OK

    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
    288    190.8 MiB    190.8 MiB           1   @profile
    289                                         async def main():
    290    190.8 MiB      0.0 MiB           1       URL = "https://jsonplaceholder.typicode.com/posts"
    291    190.8 MiB      0.0 MiB           1       REQUEST_COUNT = 1000
    292    190.8 MiB      0.0 MiB           1       CONCURRENT_LIMIT = 50  
    293    190.8 MiB      0.0 MiB           1       BATCH_SIZE = 100
    294                                         
    295                                             # Initialize local session to ensure closure after each batch
    296    190.8 MiB      0.0 MiB           2       async with aiohttp.ClientSession(
    297    190.8 MiB      0.0 MiB           1           connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
    298    190.8 MiB      0.0 MiB           1       ) as session:
    299    190.8 MiB      0.0 MiB           1           semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
    300                                         
    301    191.4 MiB      0.0 MiB        1001           async def fetch_with_limit():
    302    191.4 MiB      0.1 MiB        1500               async with semaphore:
    303    191.4 MiB      0.5 MiB        5134                   return await fetch(session, URL)
    304                                         
    305    191.4 MiB      0.0 MiB          12           for i in range(0, REQUEST_COUNT, BATCH_SIZE):
    306    191.4 MiB      0.0 MiB        1030               tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
    307    191.4 MiB      0.0 MiB          20               results = await asyncio.gather(*tasks)
    308                                         
    309                                                     # Here you could process the results from each batch if needed
    310    191.4 MiB      0.0 MiB          10               logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
    311                                         
    312                                                     # Clean up after each batch
    313    191.4 MiB      0.0 MiB          10               tasks.clear()
    314                                         
    315    191.4 MiB      0.0 MiB           1       await session.close()
    316    191.4 MiB      0.0 MiB           1       del session
    317    191.4 MiB      0.0 MiB           1       gc.collect()
    318    191.4 MiB      0.0 MiB           1       return


    INFO:     127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK
    INFO:     127.0.0.1:40942 - "GET /test_aiohttp/ HTTP/1.1" 200 OK

    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
    288    191.4 MiB    191.4 MiB           2   @profile
    289                                         async def main():
    290    191.4 MiB      0.0 MiB           2       URL = "https://jsonplaceholder.typicode.com/posts"
    291    191.4 MiB      0.0 MiB           2       REQUEST_COUNT = 1000
    292    191.4 MiB      0.0 MiB           2       CONCURRENT_LIMIT = 50  
    293    191.4 MiB      0.0 MiB           2       BATCH_SIZE = 100
    294                                         
    295                                             # Initialize local session to ensure closure after each batch
    296    191.4 MiB      0.0 MiB           4       async with aiohttp.ClientSession(
    297    191.4 MiB      0.0 MiB           2           connector=aiohttp.TCPConnector(force_close=True, ttl_dns_cache=10)
    298    191.4 MiB      0.0 MiB           2       ) as session:
    299    191.4 MiB      0.0 MiB           2           semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
    300                                         
    301    206.5 MiB    -72.7 MiB        1902           async def fetch_with_limit():
    302    206.5 MiB   -107.7 MiB        2850               async with semaphore:
    303    206.5 MiB   -364.5 MiB        9791                   return await fetch(session, URL)
    304                                         
    305    206.5 MiB     -0.7 MiB          21           for i in range(0, REQUEST_COUNT, BATCH_SIZE):
    306    206.5 MiB    -74.8 MiB        1957               tasks = [fetch_with_limit() for _ in range(BATCH_SIZE)]
    307    206.5 MiB     -1.5 MiB          37               results = await asyncio.gather(*tasks)
    308                                         
    309                                                     # Here you could process the results from each batch if needed
    310    206.5 MiB     -0.7 MiB          18               logger.info(f"Batch {i // BATCH_SIZE + 1} completed with {len(results)} responses.")
    311                                         
    312                                                     # Clean up after each batch
    313    206.5 MiB     -0.7 MiB          18               tasks.clear()
    314                                         
    315    206.5 MiB      0.0 MiB           1       await session.close()
    316    206.5 MiB      0.0 MiB           1       del session
    317    206.5 MiB      0.0 MiB           1       gc.collect()
    318    206.5 MiB      0.0 MiB           1       return

观察结果

内存使用量开始时约为 171.8 MiB,但随着处理的每个批次而增加。 我已明确包含垃圾收集,但内存使用量仍在继续上升。 async with 语句确保 ClientSession 在每个批次后关闭,但它似乎并不能有效地释放内存。

附加信息

我尝试过使用 Python 版本 3.12.3、3.10.15 和 3.9.2。对于 Python 3.9.2,使用以下库版本:

aiohttp:版本 3.10.10 多字典:版本 6.1.0 亚尔:版本1.17.1 async_timeout:版本 4.0.3

问题 是什么可能导致此设置中的内存泄漏? 我可以采取任何其他步骤来确保在异步上下文中使用 aiohttp 进行正确的内存管理吗?

python aiohttp
1个回答
0
投票
  1. 将信号量和常量等仅分配一次的数据移至主体。
  2. 同时将
    fetch_with_limit
    移动到主体。每次运行 main 时,它都会创建一个新的
    fetch_with_limit
    ,并且永远不会被删除。我猜这就是为什么你会在那时看到跳跃。
  3. 我无法提供具体的理由,但程序员的直觉告诉我,你应该在函数的开始和结束时
    gc.collect()
    ,以便最好地理解为什么内存没有被释放。
© www.soinside.com 2019 - 2024. All rights reserved.