我有以下代码,我在其中初始化监听队列的消费者。
consumer = MyConsumer()
consumer.declare_queue(queue_name="my-jobs")
consumer.declare_exchange(exchange_name="my-jobs")
consumer.bind_queue(
exchange_name="my-jobs", queue_name="my-jobs", routing_key="jobs"
)
consumer.consume_messages(queue="my-jobs", callback=consumer.consume)
问题是consum方法定义如下:
async def consume(self, channel, method, properties, body):
在consume方法中,我们需要await异步函数,但这会为consume函数产生错误“coroutine is not waiting”。有没有办法在 pika 中使用异步函数作为回调?
我用
@sync
注释了我的回调,其中同步是:
def sync(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
return wrapper
(在here找到了芹菜,但它也适用于鼠兔)
我也有类似的疑问,我最终使用了AsyncioConnection适配器。
class Consumer:
def __init__(self, loop, ...):
self._loop = loop
self._in_flight_tasks = set()
def connect(self):
return AsyncioConnection(
parameters=...
custom_ioloop=self._loop,
)
...
async def _handle_message(...):
...
def on_message(self, _unused_channel, basic_deliver, properties, body):
task = self._loop.create_task(self._handle_message(tag, properties, body))
self._in_flight_tasks.add(task)
task.add_done_callback(self._in_flight_tasks.discard)
...
请注意,我将事件循环传递给消费者。我使用应用程序顶部的
asyncio.new_event_loop()
创建它。我不确定这是否是必需的,但可能是因为 Pika 我们默认使用一些自定义事件循环实现。
大部分消费者代码取自 Pika examples。
有关为何将任务添加到集合中然后又被丢弃的解释,请参阅此处。