我有这个代码
class GameMatchesWorker:
def __init__(
self,
consumer: consumers.AbstractConsumer,
producer: producers.AbstractProducer,
provider_client: fast_data.FastDataClient,
) -> None:
self._consumer = consumer
self._producer = producer
self._provider_client = provider_client
self._matches = set()
self._input_buffer = []
self._input_event = asyncio.Event()
self._match_schedular = {}
self._lock = asyncio.Lock()
async def _receive(self) -> dict:
while not self._input_buffer:
await asyncio.wait_for(self._input_event.wait(), timeout=10)
self._input_event.clear()
received_data = self._input_buffer.pop(0)
if received_data.get("code") == fast_data.StatusCodes.SUCCESS:
return {game_match_meta["game_id"]: game_match_meta for game_match_meta in received_data["list"]}
async def _get_update_schedule(self):
current_date = datetime.now()
for source in range(1,6):
await self._provider_client.emit_games_list(
date_from=current_date, date_to=current_date, source=source
)
matches_schedule = await self._receive()
self._match_schedular[source] = matches_schedule
def _check_match_in_schedule(self, match_id: int):
for source in self._match_schedular.values():
if match_id in self._match_schedular[source]:
return True
async def _consume_callback(self, game_match_message: consumers.GameMatchMessage) -> None:
with TRACER.start_as_current_span("fastdata_workerhost_consume_message"):
game_match_id = game_match_message.game_id
if not self._check_match_in_schedule(game_match_id):
await self._get_update_schedule()
当代码执行到_consume_callback中的self._get_update_schedule()方法时,执行上下文切换到下一条消息,在这种场景下,我需要实现同步执行。使用库aio-pika
我尝试安装异步锁。 Lock() 在不同的程序级别
我不熟悉 aio-pika,我也不是 100% 清楚你在问什么,但我认为你是说,当调用
_consume_callback
时,你需要完成它,而不需要在同一线程中运行另一个异步任务已调度,即您不希望 _consume_callback
的执行被暂停。
如果这是正确的,您应该从方法
async
中删除 _get_update_schedule
属性以使其同步。但接下来的问题就变成了这个同步方法如何调用异步 _provider_client.emit_games_list
和 receive
方法呢?我将使用的技术是创建一个 concurrent.futures.ThreadPoolExecutor
实例并让 _consume_callback
在线程池中运行。因此, _consume_callback
不会等待任何内容,因此不会让另一个异步任务运行,并且 _get_update_schedule
将在另一个线程中运行,并且能够使用 asyncio.run()
调用任何异步方法。不知道的语义我不能保证在另一个线程中运行 _provider_client.emit_games_list
和 receive
不会有问题。
以下演示程序演示了该技术:
import asyncio
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(1) # Only need a single thread
class MyClass:
...
# This is now synchronous running in a nother thread
# and we can use asyncio.run to invoke the async coroutines:
def foo(self):
for source in range(1,6):
# Call async routine with asyncio.run:
asyncio.run(self.bar())
async def bar(self):
print('bar')
async def foobar(self) -> None:
# Note that we are not using loop.run_in_executor
# since that returns a future that must be awaited:
future = pool.submit(self.foo)
result = future.result()
obj = MyClass()
asyncio.run(obj.foobar())
``
Your code will therefore be:
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(1) # Only need a single thread
class GameMatchesWorker:
...
# This is now synchronous running in a nother thread
# and we can use asyncio.run to invoke the async coroutines:
def _get_update_schedule(self):
current_date = datetime.now()
for source in range(1,6):
asyncio.run(self._provider_client.emit_games_list(
date_from=current_date, date_to=current_date, source=source
)
)
matches_schedule = asyncio.run(self._receive())
self._match_schedular[source] = matches_schedule
async def _check_match_in_schedule(self, match_id: int):
for source in self._match_schedular.values():
if match_id in self._match_schedular[source]:
return True
async def _consume_callback(self, game_match_message: consumers.GameMatchMessage) -> None:
with TRACER.start_as_current_span("fastdata_workerhost_consume_message"):
game_match_id = game_match_message.game_id
if not self._check_match_in_schedule(game_match_id):
future = pool.submit(self._get_update_schedule)
result = future.result()