从Flask发布到RabbitMQ

问题描述 投票:1回答:1

我有一个由Gunicorn运行的Flask Web应用程序。该Web应用程序有几个端点,在这些端点上接收JSON数据,对其进行处理,然后需要将处理后的数据发布到RabbitMQ交换中以供其他微服务获取,以进行进一步处理。 HTTP端无需响应。

我有一个名为rabbit.py的文件,其内容如下:

import pika
import json
import os

credentials = pika.PlainCredentials(os.environ['AMQP_USER'], os.environ['AMQP_USER'])
parameters = pika.ConnectionParameters(os.environ['AMQP_HOST'],
                                       os.environ['AMQP_PORT'],
                                       '/',
                                       credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.exchange_declare(exchange=os.environ['AMQP_EXCHANGE'],
                         exchange_type='fanout',
                         durable=True)

def publish(data):
    channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
                        routing_key='',
                        body=json.dumps(data),
                        properties=pika.BasicProperties(delivery_mode = 2))

然后从Flask端点函数中执行publish(data_object)以将数据发布到RabbitMQ。

问题是由于未处理ping导致与RabbitMQ的连接超时

rabbitmq_1     | 2020-04-01 08:56:45.911 [info] <0.806.0> accepting AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672)
rabbitmq_1     | 2020-04-01 08:56:45.915 [info] <0.806.0> connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
rabbitmq_1     | 2020-04-01 09:02:45.920 [error] <0.806.0> closing AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672):
rabbitmq_1     | missed heartbeats from client, timeout: 60s
ingress-api_1  | [2020-04-01 09:06:16,835] ERROR in app: Exception on /ttn/v2 [POST]
ingress-api_1  | Traceback (most recent call last):
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2446, in wsgi_app
ingress-api_1  |     response = self.full_dispatch_request()
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1951, in full_dispatch_request
ingress-api_1  |     rv = self.handle_user_exception(e)
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1820, in handle_user_exception
ingress-api_1  |     reraise(exc_type, exc_value, tb)
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
ingress-api_1  |     raise value
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1949, in full_dispatch_request
ingress-api_1  |     rv = self.dispatch_request()
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1935, in dispatch_request
ingress-api_1  |     return self.view_functions[rule.endpoint](**req.view_args)
ingress-api_1  |   File "/app/main.py", line 28, in ttn_v2_endpoint
ingress-api_1  |     ttn.parse_ttn_http_post_v2(request.get_json())
ingress-api_1  |   File "/app/ttn.py", line 72, in parse_ttn_http_post_v2
ingress-api_1  |     rabbit.publish(parsed_object)
ingress-api_1  |   File "/app/rabbit.py", line 18, in publish
ingress-api_1  |     channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
ingress-api_1  |     self._flush_output()
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
ingress-api_1  |     self._connection._flush_output(lambda: self.is_closed, *waiters)
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
ingress-api_1  |     raise self._closed_result.value.error
ingress-api_1  | pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

有一些第三方扩展将rabbitmq带入flask。然后有一些建议如何实现这一点。有些人使用芹菜。

我需要的是一种简单直接的将数据发布到Rabbit的简单方法。我不需要订阅回复。我可以禁用ping保持活动,或者最好和最简单的解决方案是什么?

python flask rabbitmq
1个回答
0
投票
它没有相同的连接错误,因此似乎可以正常工作。
© www.soinside.com 2019 - 2024. All rights reserved.