如何在Python中使用异步Redis客户端+Django?

问题描述 投票:0回答:1

我正在尝试使用 Redis 创建一个分布式信号量,以便在我的 Django 应用程序中使用。这是为了限制对 API 的并发请求。我在 redis-py 中使用 asyncio。但是,我想创建一个连接池来跨请求共享,因为我收到“已达到最大客户端数”错误。因此,我在

settings.py
中创建了一个共享连接池,并在信号量类中使用它。但是,当我发出并发请求时,我收到错误
got Future <Future pending> attached to a different loop
。这是我的代码:

import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis

STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16


class SemaphoreTimeoutError(Exception):
    """Exception raised when a semaphore acquisition times out."""

    def __init__(self, message: str) -> None:
        super().__init__(message)


class RedisSemaphore:
    def __init__(
        self,
        key: str,
        max_locks: int,
        timeout: int = 30,
        wait_timeout: int = 30,
    ) -> None:
        """
        Initialize the RedisSemaphore.

        :param redis_url: URL of the Redis server.
        :param key: Redis key for the semaphore.
        :param max_locks: Maximum number of concurrent locks.
        :param timeout: How long until the lock should automatically be timed out in seconds.
        :param wait_timeout: How long to wait before aborting attempting to acquire a lock.
        """
        self.redis_url = os.environ["REDIS_URL"]
        self.key = key
        self.max_locks = max_locks
        self.timeout = timeout
        self.wait_timeout = wait_timeout
        self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
        self.identifier = "Only identifier"

    async def acquire(self) -> str:
        """
        Acquire a lock from the semaphore.

        :raises SemaphoreTimeoutError: If the semaphore acquisition times out.
        :return: The identifier for the acquired semaphore.
        """
        czset = f"{self.key}:owner"
        ctr = f"{self.key}:counter"
        identifier = str(uuid.uuid4())
        now = time.time()
        start_time = now
        backoff = STARTING_BACKOFF_S

        while True:
            # TODO: Redundant?
            if time.time() - start_time > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")

            async with self.redis.pipeline(transaction=True) as pipe:
                pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
                pipe.zinterstore(czset, {czset: 1, self.key: 0})
                pipe.incr(ctr)
                counter = (await pipe.execute())[-1]

                pipe.zadd(self.key, {identifier: now})
                pipe.zadd(czset, {identifier: counter})
                pipe.zrank(czset, identifier)
                rank = (await pipe.execute())[-1]

                print(rank)
                if rank < self.max_locks:
                    return identifier

                pipe.zrem(self.key, identifier)
                pipe.zrem(czset, identifier)
                await pipe.execute()

            # Exponential backoff with randomness
            sleep_time = backoff * (1 + random.random() * 0.3)
            if (sleep_time + time.time() - start_time) > self.wait_timeout:
                raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
            await asyncio.sleep(sleep_time)
            backoff = min(backoff * 2, MAX_BACKOFF_S)

    async def release(self, identifier: str) -> bool:
        """
        Release a lock from the semaphore.

        :param identifier: The identifier for the lock to be released.
        :return: True if the semaphore was properly released, False if it had timed out.
        """
        czset = f"{self.key}:owner"
        async with self.redis.pipeline(transaction=True) as pipe:
            pipe.zrem(self.key, identifier)
            pipe.zrem(czset, identifier)
            result = await pipe.execute()
        return result[0] > 0


class RedisSemaphoreContext:
    def __init__(self, semaphore: RedisSemaphore) -> None:
        """
        Initialize the RedisSemaphoreContext.

        :param semaphore: An instance of RedisSemaphore.
        """
        self.semaphore = semaphore
        self.identifier = None

    async def __aenter__(self) -> "RedisSemaphoreContext":
        """Enter the async context manager."""
        self.identifier = await self.semaphore.acquire()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Exit the async context manager."""
        await self.semaphore.release(self.identifier)

追踪


  File "/Users/.../app/fetchers.py", line 313, in get_page_with_semaphore
    async with RedisSemaphoreContext(semaphore):
  File "/Users/.../app/redis_semaphore.py", line 123, in __aexit__
    await self.semaphore.release(self.identifier)
  File "/Users/.../app/redis_semaphore.py", line 102, in release
    result = await pipe.execute()
             ^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1528, in execute
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1371, in _execute_transaction
    await self.parse_response(connection, "_")
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1464, in parse_response
    result = await super().parse_response(connection, command_name, **options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 633, in parse_response
    response = await connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/connection.py", line 541, in read_response
    response = await self._parser.read_response(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
    response = await self._read_response(disable_decoding=disable_decoding)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
    raw = await self._readline()
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/base.py", line 219, in _readline
    data = await self._stream.readline()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 660, in readuntil
    await self._wait_for_data('readuntil')
  File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
    await self._waiter
RuntimeError: Task <Task pending name='Task-111' coro=<GeneralWebPageFetcher.async_get_pages.<locals>.get_page_with_semaphore() running at /Users/app/fetchers.py:313> cb=[gather.<locals>._done_callback() at /opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop

然后我在我的 adrf 异步视图中使用它。

我做错了什么?这可能吗?

python django redis python-asyncio
1个回答
0
投票

答案结果是完全不相关的。事实上,Django

runserver
在异步和非异步端点的混合上表现不佳。由于某种原因,当我通过 settings.py 共享 Redis 连接池时,这个问题才出现。

修复方法是使用 uvicorn 在本地运行,从而解决了该问题。另一个修复方法是将所有端点转为异步。

© www.soinside.com 2019 - 2024. All rights reserved.