我希望创建类似于上下文切换机制的东西,允许一次使用一个共享资源。
在我的例子中,它是连接到网站的单个会话对象,每个上下文都是连接到网站的当前帐户。作为系统限制 - 不能同时连接两个帐户,并且连接是一项成本高昂的操作。
我设计了以下机制,其中涉及在昂贵的后台操作期间释放上下文(切换帐户):
counter = 0 # i.e. ContextVar for current account
def increase_counter():
global counter
counter += 1
def decrease_counter():
global counter
counter -= 1
async def run_operation():
while True:
operation = operation_queue.get()
increase_counter()
task = asyncio.create_task(operation())
task.add_done_callback(lambda fut: decrease_counter())
await wait_for_free() # wait until counter == 0
switch_context() # Log in to a different account
async def operation():
# do stuff
decrease_counter()
sleep(60) # Long background operation
await wait_for_context() # Wait for context to come back and be 1.
# continue
当只有一条操作链时,这种机制运作良好。内部函数总是可以释放计数器并在操作结束后将其收回。
不幸的是,当同时运行两个操作时,它会停止工作:
async def operation():
increase_counter()
task1 = asyncio.create_task(sub_op())
task1.add_done_callback(decrease)
increase_counter()
task2 = asyncio.create_task(sub_op())
task2.add_done_callback(decrease)
decrease_counter()
await asyncio.gather(task1, task2)
await wait_for_context()
async def sub_op():
decrease_counter()
sleep(60)
await wait_for_context()
这种情况下的计数器前进如下:
run_op (+1 = 1)
task1_creation (+1 = 2)
task2_creation (+1 = 3)
gather_release (-1 = 2)
task1_sleep (-1 = 1)
task2_sleep (-1 = 0) # Context released
task1_resume (+1 = 1)
task2_resume (+1 = 2)
task1_done_callback (-1 = 1)
task2_done_callback (-1 = 0) # Context needlessly released
gather_resume (+1 = 1)
使用这种机制是否可以防止不必要的释放,或者这只是生活中的事实?如果是这样,除了使用计数器之外,还有其他机制可以解决这个问题吗?
在类
Context
中,您可以修改 __init__
方法来执行所需的任何初始化操作,并修改 acquire
以添加“激活”上下文所需的任何代码(例如在共享会话实例中更新帐户信息) ).
import asyncio
from collections import deque
import random
import time
class Context:
"""Context-specific information"""
_current_context = None
_waiting_contexts = deque()
def __init__(self):
self._event = asyncio.Event()
... # Any other initializations
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_t, exc_v, exc_tb):
self.release()
async def acquire(self):
if Context._current_context is None:
# No waiting required
Context._current_context = self
return
# Add this context to list of those waiting:
self._waiting_contexts.append(self)
print(f'{self} is waiting for acquisition at {time.time()}')
await self._event.wait() # Wait until we acquire context
self._event.clear() # For next time
def release(self):
assert Context._current_context is self
Context._current_context = None
if self._waiting_contexts: # Someone is waiting
context = self._waiting_contexts.popleft()
Context._current_context = context
context._event.set() # Wake up task
async def sub_op(id):
# Create and initialize your context
context = Context()
print(f'sub_op({id})context is {context}')
for _ in range(3):
# Acquire the context:
print(f'sub_op({id}) is acquiring its context at {time.time()}')
async with context:
print(f'sub_op({id}) working with context {context} at {time.time()}')
# Do some work:
await asyncio.sleep(random.randint(1, 3))
# Context is released
print(f'sub_op({id}) has released its context at {time.time()}')
# Do other stuff
await asyncio.sleep(random.randint(1, 3))
print(f'sub_op({id}) is doing other stuff without the context')
print(f'sub_op({id})) finishing')
async def create_and_run_tasks():
tasks = [
sub_op(i) for i in range(1, 4)
]
await asyncio.gather(*tasks)
print('All tasks completed.')
asyncio.run(create_and_run_tasks())
打印:
sub_op(1)context is <__main__.Context object at 0x000001D839A8B710>
sub_op(1) is acquiring its context at 1727807143.389532
sub_op(1) working with context <__main__.Context object at 0x000001D839A8B710> at 1727807143.389532
sub_op(2)context is <__main__.Context object at 0x000001D839AA6450>
sub_op(2) is acquiring its context at 1727807143.389532
<__main__.Context object at 0x000001D839AA6450> is waiting for acquisition at 1727807143.389532
sub_op(3)context is <__main__.Context object at 0x000001D839AA6390>
sub_op(3) is acquiring its context at 1727807143.3905165
<__main__.Context object at 0x000001D839AA6390> is waiting for acquisition at 1727807143.3905165
sub_op(1) has released its context at 1727807145.3982353
sub_op(2) working with context <__main__.Context object at 0x000001D839AA6450> at 1727807145.3992374
sub_op(2) has released its context at 1727807147.4108717
sub_op(3) working with context <__main__.Context object at 0x000001D839AA6390> at 1727807147.4108717
sub_op(1) is doing other stuff without the context
sub_op(1) is acquiring its context at 1727807148.4050503
<__main__.Context object at 0x000001D839A8B710> is waiting for acquisition at 1727807148.4050503
sub_op(2) is doing other stuff without the context
sub_op(2) is acquiring its context at 1727807150.421186
<__main__.Context object at 0x000001D839AA6450> is waiting for acquisition at 1727807150.4221883
sub_op(3) has released its context at 1727807150.4221883
sub_op(1) working with context <__main__.Context object at 0x000001D839A8B710> at 1727807150.4231749
sub_op(3) is doing other stuff without the context
sub_op(3) is acquiring its context at 1727807152.4348192
<__main__.Context object at 0x000001D839AA6390> is waiting for acquisition at 1727807152.4348192
sub_op(1) has released its context at 1727807153.4338317
sub_op(2) working with context <__main__.Context object at 0x000001D839AA6450> at 1727807153.4347506
sub_op(1) is doing other stuff without the context
sub_op(1) is acquiring its context at 1727807156.4415817
<__main__.Context object at 0x000001D839A8B710> is waiting for acquisition at 1727807156.4430528
sub_op(2) has released its context at 1727807156.4430528
sub_op(3) working with context <__main__.Context object at 0x000001D839AA6390> at 1727807156.4440517
sub_op(2) is doing other stuff without the context
sub_op(2) is acquiring its context at 1727807157.446158
<__main__.Context object at 0x000001D839AA6450> is waiting for acquisition at 1727807157.446158
sub_op(3) has released its context at 1727807159.4486222
sub_op(1) working with context <__main__.Context object at 0x000001D839A8B710> at 1727807159.45161
sub_op(3) is doing other stuff without the context
sub_op(3) is acquiring its context at 1727807162.4659755
<__main__.Context object at 0x000001D839AA6390> is waiting for acquisition at 1727807162.4669642
sub_op(1) has released its context at 1727807162.4669642
sub_op(2) working with context <__main__.Context object at 0x000001D839AA6450> at 1727807162.467968
sub_op(1) is doing other stuff without the context
sub_op(1)) finishing
sub_op(2) has released its context at 1727807165.4751408
sub_op(3) working with context <__main__.Context object at 0x000001D839AA6390> at 1727807165.4751408
sub_op(2) is doing other stuff without the context
sub_op(2)) finishing
sub_op(3) has released its context at 1727807168.469866
sub_op(3) is doing other stuff without the context
sub_op(3)) finishing
All tasks completed.