在asyncio中实现上下文切换机制

问题描述 投票:0回答:1

我希望创建类似于上下文切换机制的东西,允许一次使用一个共享资源。

在我的例子中,它是连接到网站的单个会话对象,每个上下文都是连接到网站的当前帐户。作为系统限制 - 不能同时连接两个帐户,并且连接是一项成本高昂的操作。

我设计了以下机制,其中涉及在昂贵的后台操作期间释放上下文(切换帐户):

counter = 0  # i.e. ContextVar for current account
def increase_counter():
  global counter
  counter += 1

def decrease_counter():
  global counter
  counter -= 1

async def run_operation():
  while True:
    operation = operation_queue.get()
    increase_counter()

    task = asyncio.create_task(operation())
    task.add_done_callback(lambda fut: decrease_counter())

    await wait_for_free() # wait until counter == 0
    switch_context()  # Log in to a different account

async def operation():
  # do stuff
  decrease_counter()
  sleep(60) # Long background operation
  await wait_for_context()  # Wait for context to come back and be 1.
  # continue

当只有一条操作链时,这种机制运作良好。内部函数总是可以释放计数器并在操作结束后将其收回。

不幸的是,当同时运行两个操作时,它会停止工作:

async def operation():
  increase_counter()
  task1 = asyncio.create_task(sub_op())
  task1.add_done_callback(decrease)
  increase_counter()
  task2 = asyncio.create_task(sub_op())
  task2.add_done_callback(decrease)

  decrease_counter()
  await asyncio.gather(task1, task2)
  await wait_for_context()

async def sub_op():
  decrease_counter()
  sleep(60)
  await wait_for_context()

这种情况下的计数器前进如下:

run_op (+1 = 1)
task1_creation (+1 = 2)
task2_creation (+1 = 3)
gather_release (-1 = 2)
task1_sleep (-1 = 1)
task2_sleep (-1 = 0)  # Context released
task1_resume (+1 = 1)
task2_resume (+1 = 2)
task1_done_callback (-1 = 1)
task2_done_callback (-1 = 0)  # Context needlessly released
gather_resume (+1 = 1)

使用这种机制是否可以防止不必要的释放,或者这只是生活中的事实?如果是这样,除了使用计数器之外,还有其他机制可以解决这个问题吗?

python concurrency architecture python-asyncio context-switch
1个回答
0
投票

在类

Context
中,您可以修改
__init__
方法来执行所需的任何初始化操作,并修改
acquire
以添加“激活”上下文所需的任何代码(例如在共享会话实例中更新帐户信息) ).

import asyncio
from collections import deque
import random
import time

class Context:
    """Context-specific information"""

    _current_context = None
    _waiting_contexts = deque()

    def __init__(self):
        self._event = asyncio.Event()
        ...  # Any other initializations

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_t, exc_v, exc_tb):
        self.release()

    async def acquire(self):
        if Context._current_context is None:
            # No waiting required
            Context._current_context = self
            return

        # Add this context to list of those waiting:
        self._waiting_contexts.append(self)
        print(f'{self} is waiting for acquisition at {time.time()}')
        await self._event.wait()  # Wait until we acquire context
        self._event.clear() # For next time


    def release(self):
        assert Context._current_context is self
        Context._current_context = None
        if self._waiting_contexts: # Someone is waiting
            context = self._waiting_contexts.popleft()
            Context._current_context = context
            context._event.set()  # Wake up task

async def sub_op(id):
    # Create and initialize your context
    context = Context()
    print(f'sub_op({id})context is {context}')

    for _ in range(3):
        # Acquire the context:
        print(f'sub_op({id}) is acquiring its context at {time.time()}')
        async with context:
            print(f'sub_op({id}) working with context {context} at {time.time()}')

            # Do some work:
            await asyncio.sleep(random.randint(1, 3))

        # Context is released
        print(f'sub_op({id}) has released its context at {time.time()}')

        # Do other stuff
        await asyncio.sleep(random.randint(1, 3))
        print(f'sub_op({id}) is doing other stuff without the context')

    print(f'sub_op({id})) finishing')


async def create_and_run_tasks():
    tasks = [
        sub_op(i) for i in range(1, 4)
    ]

    await asyncio.gather(*tasks)

    print('All tasks completed.')

asyncio.run(create_and_run_tasks())

打印:

sub_op(1)context is <__main__.Context object at 0x000001D839A8B710>
sub_op(1) is acquiring its context at 1727807143.389532
sub_op(1) working with context <__main__.Context object at 0x000001D839A8B710> at 1727807143.389532
sub_op(2)context is <__main__.Context object at 0x000001D839AA6450>
sub_op(2) is acquiring its context at 1727807143.389532
<__main__.Context object at 0x000001D839AA6450> is waiting for acquisition at 1727807143.389532
sub_op(3)context is <__main__.Context object at 0x000001D839AA6390>
sub_op(3) is acquiring its context at 1727807143.3905165
<__main__.Context object at 0x000001D839AA6390> is waiting for acquisition at 1727807143.3905165
sub_op(1) has released its context at 1727807145.3982353
sub_op(2) working with context <__main__.Context object at 0x000001D839AA6450> at 1727807145.3992374
sub_op(2) has released its context at 1727807147.4108717
sub_op(3) working with context <__main__.Context object at 0x000001D839AA6390> at 1727807147.4108717
sub_op(1) is doing other stuff without the context
sub_op(1) is acquiring its context at 1727807148.4050503
<__main__.Context object at 0x000001D839A8B710> is waiting for acquisition at 1727807148.4050503
sub_op(2) is doing other stuff without the context
sub_op(2) is acquiring its context at 1727807150.421186
<__main__.Context object at 0x000001D839AA6450> is waiting for acquisition at 1727807150.4221883
sub_op(3) has released its context at 1727807150.4221883
sub_op(1) working with context <__main__.Context object at 0x000001D839A8B710> at 1727807150.4231749
sub_op(3) is doing other stuff without the context
sub_op(3) is acquiring its context at 1727807152.4348192
<__main__.Context object at 0x000001D839AA6390> is waiting for acquisition at 1727807152.4348192
sub_op(1) has released its context at 1727807153.4338317
sub_op(2) working with context <__main__.Context object at 0x000001D839AA6450> at 1727807153.4347506
sub_op(1) is doing other stuff without the context
sub_op(1) is acquiring its context at 1727807156.4415817
<__main__.Context object at 0x000001D839A8B710> is waiting for acquisition at 1727807156.4430528
sub_op(2) has released its context at 1727807156.4430528
sub_op(3) working with context <__main__.Context object at 0x000001D839AA6390> at 1727807156.4440517
sub_op(2) is doing other stuff without the context
sub_op(2) is acquiring its context at 1727807157.446158
<__main__.Context object at 0x000001D839AA6450> is waiting for acquisition at 1727807157.446158
sub_op(3) has released its context at 1727807159.4486222
sub_op(1) working with context <__main__.Context object at 0x000001D839A8B710> at 1727807159.45161
sub_op(3) is doing other stuff without the context
sub_op(3) is acquiring its context at 1727807162.4659755
<__main__.Context object at 0x000001D839AA6390> is waiting for acquisition at 1727807162.4669642
sub_op(1) has released its context at 1727807162.4669642
sub_op(2) working with context <__main__.Context object at 0x000001D839AA6450> at 1727807162.467968
sub_op(1) is doing other stuff without the context
sub_op(1)) finishing
sub_op(2) has released its context at 1727807165.4751408
sub_op(3) working with context <__main__.Context object at 0x000001D839AA6390> at 1727807165.4751408
sub_op(2) is doing other stuff without the context
sub_op(2)) finishing
sub_op(3) has released its context at 1727807168.469866
sub_op(3) is doing other stuff without the context
sub_op(3)) finishing
All tasks completed.
© www.soinside.com 2019 - 2024. All rights reserved.