我正在致力于集成 RabbitMQ 和 GCP Pub/Sub。 我正在使用 Python 的
Pika
库。通过 BlockingConnection
建立连接。根据我执行的一些初步测试,解决方案似乎有效,但我对回调函数的构造方式有一些疑问。我正在调用
publish
方法发布到 GCP 中的 Pub/Sub,然后在块尝试中,basic_ack
处理已处理的消息。
如果有人在这方面有经验,我能否要求对我的解决方案发表一些意见,并可能提供一些如何实施的示例。
def consume_data_callback(self, basic_deliver, body):
# ... some code
future = self.publisher.publish(topic_path, self.payload.SerializeToString())
try:
message_id = future.result(timeout=1)
self.channel.basic_ack(basic_deliver.delivery_tag)
except Exception as e:
future.cancel()
_logger.error("Result after publishing Pub/Sub with: {}".format(e))
谢谢您的回答。
我个人还没有尝试过,但我确实搜索了可能对您有帮助的参考资料。这篇RabbitMQ - 发布/订阅文章有一个回调代码示例,可以帮助您确认您所编写的内容是否是合适的解决方案。
GitHub 完整代码版本:
#!/usr/bin/env python
import os
import pika
import sys
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)