我有一个 Python 3.9 项目,我想在其中运行一个连接到 AMQP 1.0 代理并侦听传入消息的服务。相同的服务还应该提供 RESTful API,以便触发消息发送到代理。
我使用的图书馆:
这是消息接收器初始化和启动的示例:
from proton.handlers import MessagingHandler
from proton.reactor import Container
class Receiver(MessagingHandler):
def __init__(self, url, address):
super(Receiver, self).__init__()
self.url = url
self.address = address
def on_start(self, event):
event.container.connect(self.url)
def on_connection_opened(self, event):
print("Connected to {}".format(self.url))
event.container.create_receiver(event.connection, self.address)
def on_message(self, event):
print("Received message: {}".format(event.message.body))
url = "amqps://user:pass@localhost:5671"
address = "example"
handler = Receiver(url, address)
receiving_container = Container(handler)
try:
receiving_container.run()
print("Container started")
except KeyboardInterrupt:
pass
现在我的问题是,一旦我使用 run() 启动receive_container,它就会阻止在侦听消息时执行任何其他代码。这可以防止我同时运行除 receive_container 之外的任何其他内容。
我知道有 asyncio 可以帮助解决此类问题,我已经尝试过但没有得出任何结论。我可以使用 Uvicorn 异步运行 FastAPI,但我相信容器本身必须以异步方式实现才能与 asyncio 一起工作。
有人知道这个问题的解决方案/解决方法吗?
asyncio
接口,因此您可以将其用于 HTTP API 服务。因此,请使用 asyncio
事件循环来处理您的 HTTP API,然后当您收到应转发到消息代理的请求时,您可以使用 asyncio.to_thread(<whatever sends a message to the broker>)
来执行此操作。
由于
receiving_container.run()
希望拥有其运行的整个线程,因此您需要启动 asyncio
事件循环,在其自己的线程上运行 HTTP API。我已经在这里解释了如何做到这一点:
或者您可以在其自己的线程上启动 AMQP 消息服务,让 HTTP API
asyncio
事件循环在主线程上运行。