我有一个由Gunicorn运行的Flask Web应用程序。该Web应用程序有几个端点,在这些端点上接收JSON数据,对其进行处理,然后需要将处理后的数据发布到RabbitMQ交换中以供其他微服务获取,以进行进一步处理。 HTTP端无需响应。
我有一个名为rabbit.py的文件,其内容如下:
import pika
import json
import os
credentials = pika.PlainCredentials(os.environ['AMQP_USER'], os.environ['AMQP_USER'])
parameters = pika.ConnectionParameters(os.environ['AMQP_HOST'],
os.environ['AMQP_PORT'],
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=os.environ['AMQP_EXCHANGE'],
exchange_type='fanout',
durable=True)
def publish(data):
channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
routing_key='',
body=json.dumps(data),
properties=pika.BasicProperties(delivery_mode = 2))
然后从Flask端点函数中执行publish(data_object)
以将数据发布到RabbitMQ。
问题是由于未处理ping导致与RabbitMQ的连接超时
rabbitmq_1 | 2020-04-01 08:56:45.911 [info] <0.806.0> accepting AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672)
rabbitmq_1 | 2020-04-01 08:56:45.915 [info] <0.806.0> connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
rabbitmq_1 | 2020-04-01 09:02:45.920 [error] <0.806.0> closing AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672):
rabbitmq_1 | missed heartbeats from client, timeout: 60s
ingress-api_1 | [2020-04-01 09:06:16,835] ERROR in app: Exception on /ttn/v2 [POST]
ingress-api_1 | Traceback (most recent call last):
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2446, in wsgi_app
ingress-api_1 | response = self.full_dispatch_request()
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1951, in full_dispatch_request
ingress-api_1 | rv = self.handle_user_exception(e)
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1820, in handle_user_exception
ingress-api_1 | reraise(exc_type, exc_value, tb)
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
ingress-api_1 | raise value
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1949, in full_dispatch_request
ingress-api_1 | rv = self.dispatch_request()
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1935, in dispatch_request
ingress-api_1 | return self.view_functions[rule.endpoint](**req.view_args)
ingress-api_1 | File "/app/main.py", line 28, in ttn_v2_endpoint
ingress-api_1 | ttn.parse_ttn_http_post_v2(request.get_json())
ingress-api_1 | File "/app/ttn.py", line 72, in parse_ttn_http_post_v2
ingress-api_1 | rabbit.publish(parsed_object)
ingress-api_1 | File "/app/rabbit.py", line 18, in publish
ingress-api_1 | channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
ingress-api_1 | self._flush_output()
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
ingress-api_1 | self._connection._flush_output(lambda: self.is_closed, *waiters)
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
ingress-api_1 | raise self._closed_result.value.error
ingress-api_1 | pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
有一些第三方扩展将rabbitmq带入flask。然后有一些建议如何实现这一点。有些人使用芹菜。
我需要的是一种简单直接的将数据发布到Rabbit的简单方法。我不需要订阅回复。我可以禁用ping保持活动,或者最好和最简单的解决方案是什么?