如何在Asycnio中防止上下文切换

问题描述 投票:0回答:1

我有这个代码

class GameMatchesWorker:
    def __init__(
        self,
        consumer: consumers.AbstractConsumer,
        producer: producers.AbstractProducer,
        provider_client: fast_data.FastDataClient,
    ) -> None:
        self._consumer = consumer
        self._producer = producer
        self._provider_client = provider_client
        self._matches = set()
        self._input_buffer = []
        self._input_event = asyncio.Event()
        self._match_schedular = {}
        self._lock = asyncio.Lock()

        async def _receive(self) -> dict:
        while not self._input_buffer:
            await asyncio.wait_for(self._input_event.wait(), timeout=10)
            self._input_event.clear()
        received_data = self._input_buffer.pop(0)

        if received_data.get("code") == fast_data.StatusCodes.SUCCESS:
            return {game_match_meta["game_id"]: game_match_meta for game_match_meta in received_data["list"]}


    async def _get_update_schedule(self):
        current_date = datetime.now()
        for source in range(1,6):
            await self._provider_client.emit_games_list(
                date_from=current_date, date_to=current_date, source=source
            )
            matches_schedule = await self._receive()
            self._match_schedular[source] = matches_schedule

    def _check_match_in_schedule(self, match_id: int):
        for source in self._match_schedular.values():
            if match_id in self._match_schedular[source]:
                return True

    async def _consume_callback(self, game_match_message: consumers.GameMatchMessage) -> None:
        with TRACER.start_as_current_span("fastdata_workerhost_consume_message"):
            game_match_id = game_match_message.game_id
            if not self._check_match_in_schedule(game_match_id):
                await self._get_update_schedule()

当代码执行到_consume_callback中的self._get_update_schedule()方法时,执行上下文切换到下一条消息,在这种场景下,我需要实现同步执行。使用库aio-pika

我尝试安装异步锁。 Lock() 在不同的程序级别

python python-asyncio
1个回答
0
投票

我不熟悉 aio-pika,我也不是 100% 清楚你在问什么,但我认为你是说,当调用

_consume_callback
时,你需要完成它,而不需要在同一线程中运行另一个异步任务已调度,即您不希望
_consume_callback
的执行被暂停。

如果这是正确的,您应该从方法

async
中删除
_get_update_schedule
属性以使其同步。但接下来的问题就变成了这个同步方法如何调用异步
_provider_client.emit_games_list
receive
方法呢?我将使用的技术是创建一个
concurrent.futures.ThreadPoolExecutor
实例并让
_consume_callback
在线程池中运行。因此,
_consume_callback
不会等待任何内容,因此不会让另一个异步任务运行,并且
_get_update_schedule
将在另一个线程中运行,并且能够使用
asyncio.run()
调用任何异步方法。不知道的语义我不能保证在另一个线程中运行
_provider_client.emit_games_list
receive
不会有问题。

以下演示程序演示了该技术:

import asyncio

from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(1)  # Only need a single thread

class MyClass:
    ...

    # This is now synchronous running in a nother thread
    # and we can use asyncio.run to invoke the async coroutines:
    def foo(self):
        for source in range(1,6):
            # Call async routine with asyncio.run:
            asyncio.run(self.bar())

    async def bar(self):
        print('bar')

    async def foobar(self) -> None:
        # Note that we are not using loop.run_in_executor
        # since that returns a future that must be awaited:
        future = pool.submit(self.foo)
        result = future.result()


obj = MyClass()
asyncio.run(obj.foobar())
``

Your code will therefore be:

```python
import asyncio

from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(1)  # Only need a single thread

class GameMatchesWorker:
    ...

    # This is now synchronous running in a nother thread
    # and we can use asyncio.run to invoke the async coroutines:
    def _get_update_schedule(self):
        current_date = datetime.now()
        for source in range(1,6):
            asyncio.run(self._provider_client.emit_games_list(
                date_from=current_date, date_to=current_date, source=source
                )
            )
            matches_schedule = asyncio.run(self._receive())
            self._match_schedular[source] = matches_schedule

    async def _check_match_in_schedule(self, match_id: int):
        for source in self._match_schedular.values():
            if match_id in self._match_schedular[source]:
                return True

    async def _consume_callback(self, game_match_message: consumers.GameMatchMessage) -> None:
        with TRACER.start_as_current_span("fastdata_workerhost_consume_message"):
            game_match_id = game_match_message.game_id
            if not self._check_match_in_schedule(game_match_id):
                future = pool.submit(self._get_update_schedule)
                result = future.result()
© www.soinside.com 2019 - 2024. All rights reserved.