我正在尝试使用 Redis 创建一个分布式信号量,以便在我的 Django 应用程序中使用。这是为了限制对 API 的并发请求。我在 redis-py 中使用 asyncio。但是,我想创建一个连接池来跨请求共享,因为我收到“已达到最大客户端数”错误。因此,我在
settings.py
中创建了一个共享连接池,并在信号量类中使用它。但是,当我发出并发请求时,我收到错误got Future <Future pending> attached to a different loop
。这是我的代码:
import os
import uuid
import asyncio
import time
from typing import Any
import random
from django.conf import settings
from redis import asyncio as aioredis
STARTING_BACKOFF_S = 4
MAX_BACKOFF_S = 16
class SemaphoreTimeoutError(Exception):
"""Exception raised when a semaphore acquisition times out."""
def __init__(self, message: str) -> None:
super().__init__(message)
class RedisSemaphore:
def __init__(
self,
key: str,
max_locks: int,
timeout: int = 30,
wait_timeout: int = 30,
) -> None:
"""
Initialize the RedisSemaphore.
:param redis_url: URL of the Redis server.
:param key: Redis key for the semaphore.
:param max_locks: Maximum number of concurrent locks.
:param timeout: How long until the lock should automatically be timed out in seconds.
:param wait_timeout: How long to wait before aborting attempting to acquire a lock.
"""
self.redis_url = os.environ["REDIS_URL"]
self.key = key
self.max_locks = max_locks
self.timeout = timeout
self.wait_timeout = wait_timeout
self.redis = aioredis.Redis(connection_pool=settings.REDIS_POOL)
self.identifier = "Only identifier"
async def acquire(self) -> str:
"""
Acquire a lock from the semaphore.
:raises SemaphoreTimeoutError: If the semaphore acquisition times out.
:return: The identifier for the acquired semaphore.
"""
czset = f"{self.key}:owner"
ctr = f"{self.key}:counter"
identifier = str(uuid.uuid4())
now = time.time()
start_time = now
backoff = STARTING_BACKOFF_S
while True:
# TODO: Redundant?
if time.time() - start_time > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zremrangebyscore(self.key, "-inf", now - self.timeout)
pipe.zinterstore(czset, {czset: 1, self.key: 0})
pipe.incr(ctr)
counter = (await pipe.execute())[-1]
pipe.zadd(self.key, {identifier: now})
pipe.zadd(czset, {identifier: counter})
pipe.zrank(czset, identifier)
rank = (await pipe.execute())[-1]
print(rank)
if rank < self.max_locks:
return identifier
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
await pipe.execute()
# Exponential backoff with randomness
sleep_time = backoff * (1 + random.random() * 0.3)
if (sleep_time + time.time() - start_time) > self.wait_timeout:
raise SemaphoreTimeoutError("Waited too long to acquire the semaphore.")
await asyncio.sleep(sleep_time)
backoff = min(backoff * 2, MAX_BACKOFF_S)
async def release(self, identifier: str) -> bool:
"""
Release a lock from the semaphore.
:param identifier: The identifier for the lock to be released.
:return: True if the semaphore was properly released, False if it had timed out.
"""
czset = f"{self.key}:owner"
async with self.redis.pipeline(transaction=True) as pipe:
pipe.zrem(self.key, identifier)
pipe.zrem(czset, identifier)
result = await pipe.execute()
return result[0] > 0
class RedisSemaphoreContext:
def __init__(self, semaphore: RedisSemaphore) -> None:
"""
Initialize the RedisSemaphoreContext.
:param semaphore: An instance of RedisSemaphore.
"""
self.semaphore = semaphore
self.identifier = None
async def __aenter__(self) -> "RedisSemaphoreContext":
"""Enter the async context manager."""
self.identifier = await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit the async context manager."""
await self.semaphore.release(self.identifier)
追踪
File "/Users/.../app/fetchers.py", line 313, in get_page_with_semaphore
async with RedisSemaphoreContext(semaphore):
File "/Users/.../app/redis_semaphore.py", line 123, in __aexit__
await self.semaphore.release(self.identifier)
File "/Users/.../app/redis_semaphore.py", line 102, in release
result = await pipe.execute()
^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1528, in execute
return await conn.retry.call_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1371, in _execute_transaction
await self.parse_response(connection, "_")
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 1464, in parse_response
result = await super().parse_response(connection, command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/client.py", line 633, in parse_response
response = await connection.read_response()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/asyncio/connection.py", line 541, in read_response
response = await self._parser.read_response(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
response = await self._read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
raw = await self._readline()
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/site-packages/redis/_parsers/base.py", line 219, in _readline
data = await self._stream.readline()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 568, in readline
line = await self.readuntil(sep)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 660, in readuntil
await self._wait_for_data('readuntil')
File "/opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/streams.py", line 545, in _wait_for_data
await self._waiter
RuntimeError: Task <Task pending name='Task-111' coro=<GeneralWebPageFetcher.async_get_pages.<locals>.get_page_with_semaphore() running at /Users/app/fetchers.py:313> cb=[gather.<locals>._done_callback() at /opt/miniconda3/envs/cb-be/lib/python3.12/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop
然后我在我的 adrf 异步视图中使用它。
我做错了什么?这可能吗?
答案结果是完全不相关的。事实上,Django
runserver
在异步和非异步端点的混合上表现不佳。由于某种原因,当我通过 settings.py 共享 Redis 连接池时,这个问题才出现。
修复方法是使用 uvicorn 在本地运行,从而解决了该问题。另一个修复方法是将所有端点转为异步。