我正在我的一个项目中使用
aiohttp
,并希望限制每秒发出的请求数量。我正在使用 asyncio.Semaphore
来做到这一点。我的挑战是我可能想增加/减少每秒允许的请求数。
例如:
limit = asyncio.Semaphore(10)
async with limit:
async with aiohttp.request(...)
...
await asyncio.sleep(1)
这个效果很好。也就是说,它将
aiohttp.request
限制为每秒 10 个并发请求。但是,我可能想增加和减少 Semaphore._value
。我可以做到 limit._value = 20
但我不确定这是否是正确的方法或者还有其他方法可以做到这一点。
访问私有
_value
属性不是正确的方法,至少有两个原因:一是该属性是私有的,可以在未来版本中删除、重命名或更改含义,恕不另行通知,二是增加限制会赢得胜利不会被已经有服务员的信号量注意到。
由于
asyncio.Semaphore
不支持动态修改限制,因此您有两种选择:实现支持它的自己的 Semaphore
类,或者根本不使用 Semaphore
。后者可能更容易,因为您始终可以用通过队列接收作业的固定数量的工作任务来替换信号量强制限制。假设您当前的代码如下所示:
async def fetch(limit, arg):
async with limit:
# your actual code here
return result
async def tweak_limit(limit):
# here you'd like to be able to increase the limit
async def main():
limit = asyncio.Semaphore(10)
asyncio.create_task(tweak_limit(limit))
results = await asyncio.gather(*[fetch(limit, x) for x in range(1000)])
你可以通过提前创建工人并给他们工作来表达它,而无需信号量:
async def fetch_task(queue, results):
while True:
arg = await queue.get()
# your actual code here
results.append(result)
queue.task_done()
async def main():
# fill the queue with jobs for the workers
queue = asyncio.Queue()
for x in range(1000):
await queue.put(x)
# create the initial pool of workers
results = []
workers = [asyncio.create_task(fetch_task(queue, results))
for _ in range(10)]
asyncio.create_task(tweak_limit(workers, queue, results))
# wait for workers to process the entire queue
await queue.join()
# finally, cancel the now-idle worker tasks
for w in workers:
w.cancel()
# results are now available
tweak_limit()
函数现在可以通过生成新工人来增加限制:
async def tweak_limit(workers, queue, results):
while True:
await asyncio.sleep(1)
if need_more_workers:
workers.append(asyncio.create_task(fetch_task(queue, results)))
使用worker和队列是一个更复杂的解决方案,你必须考虑诸如设置、拆卸、异常处理和背压等问题。
信号量可以用 Lock 来实现,如果你不介意效率低下(你会明白为什么),这里有一个动态值信号量的简单实现:
class DynamicSemaphore:
def __init__(self, value=1):
self._lock = asyncio.Lock()
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self.value = value
async def __aenter__(self):
await self.acquire()
return None
async def __aexit__(self, exc_type, exc, tb):
self.release()
def locked(self):
return self.value == 0
async def acquire(self):
async with self._lock:
while self.value <= 0:
await asyncio.sleep(0.1)
self.value -= 1
return True
def release(self):
self.value += 1
实际上有一种方法可以仅使用公共 API 来做到这一点。为信号量提供支持的底层技术只是一个计数器,用于跟踪某些资源的空闲“槽”数量(在本例中为可以同时发出的请求数量)。当您尝试在
limit
内部使用名为 with:
的信号量时,会调用 limit.acquire()
函数,信号量会一直等到计数器大于零(表明至少有一个空闲槽),然后减一从柜台(标记我们正在使用这些插槽之一)。当您使用完它时,它会调用 limit.release()
,这会为计数器加一。值得注意的是,我们还可以手动调用 limit.release
,甚至可以比最初创建信号量时增加计数器更多次。1
例如:
limit = asyncio.Semaphore(10)
for i in range(10):
limit.release()
# limit now has 20 slots
# ...
# you can also decrement
for i in range(5):
await limit.acquire()
# limit now has 15 slots
不幸的是,由于每次想要增加计数器时都必须调用该方法一次,因此如果您想要大量更改限制,这将不会有效,并且其他答案之一可能仍然更好。但是,请知道使用 API 是可以实现的。
1 正是因为这个原因,还存在BoundedSemaphore,旨在防止意外更改信号量的值可能发生的错误。