Flask + Celery + RabbitMQ - kombu.exceptions.operationalerror errno 111 连接被拒绝

问题描述 投票:0回答:1

下面是flask+celery+rabbitMQ的代码片段,问题是它们在本地工作,但是当我部署在EC2实例上时,我收到errno 111连接被拒绝。

start_celery.py

from celery import Celery, Task
from flask import Flask
import os
from config import *

def create_celery(app):
    class ContextTask(Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery = Celery(app.name, task_cls=ContextTask)
    celery.config_from_object(app.config["CELERY"])
    celery.conf.update(app.config)
    celery.conf.task_default_queue = SERVER_TYPE + '_queue'
    celery.conf.task_default_exchange = 'celery:' + SERVER_TYPE
    celery.conf.task_default_routing_key = SERVER_TYPE + '.task'
    celery.conf.broker_connection_options = {'broker_pool_limit': 0}
    celery.conf.broker_transport_options = {'confirm_publish': True}
    app.extensions["celery"] = celery

    return celery


app = Flask(__name__)
app.config.update(
    CELERY=dict(
        broker_url=RABBITMQ_URL,
        result_backend='rpc://',
        task_ignore_result=False,
    ))
celery_app = create_celery(app)

celery_impl.py

from celery import shared_task

from backend.queue_manager.subscriber_job_types import SUBSCRIBER_JOBS

from queue_manager.start_celery import celery_app
import os
from config import *


class Celery:

    def publish(self, *args, **kwargs):
        # self.subscriber.delay(task, args, kwargs)
        exchange_name = 'celery:' + SERVER_TYPE
        routing_key = SERVER_TYPE + '.task'
        queue_name = SERVER_TYPE + '_queue'
        return self.subscribe.apply_async(args=args, kwargs=kwargs, exchange=exchange_name,
                                          routing_key=routing_key, queue=queue_name)

    @shared_task(bind=True)
    def subscribe(self, *args, **kwargs):
        task = kwargs.get('task')
        kwargs.pop('task')
        val = SUBSCRIBER_JOBS[task]["worker"](*args, **kwargs)
        kwargs['task'] = task
        return val

并且在 start_celery.py 中创建的应用程序已导入到正在使用 app.run 的主文件中。 RABBITMQ_URL 根据环境的不同而变化,是的,我检查了这些实例的相关入站和出站规则,它们的所有端口的rabbit mq 容器和代码容器都可以相互访问

尝试了网上各种解决方案,例如 从 .celery 将应用程序导入为 celery_app

全部 = ('celery_app',)

还有其他一些,但都不起作用

flask rabbitmq celery kombu
1个回答
0
投票

检查您的 Flask 服务器是否可以与 ec2 实例中的 celery 工作进行通信。当我使用 docker 并让我的 celery Worker 在

netowork_mode: 'host'
上运行并且我的服务器正在使用容器网络时,我遇到了这个问题。

#compose.yml
      server:
        build: .
        container_name: slice_backend
        restart: always
        ports:
          - "8000:8000"
        volumes:
          - .:/app
    
      celery:
        build: .
        container_name: celery_worker
        command: celery -A tasks worker 
        volumes:
          - .:/app
        depends_on:
          - server
          - rabbitMQ
        environment:
          - CELERY_BROKER_URL=pyamqp://guest:guest@rabbitMQ:5672//
          - CELERY_RESULT_BACKEND=rpc://
        network_mode: host
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.