下面是flask+celery+rabbitMQ的代码片段,问题是它们在本地工作,但是当我部署在EC2实例上时,我收到errno 111连接被拒绝。
start_celery.py
from celery import Celery, Task
from flask import Flask
import os
from config import *
def create_celery(app):
class ContextTask(Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery = Celery(app.name, task_cls=ContextTask)
celery.config_from_object(app.config["CELERY"])
celery.conf.update(app.config)
celery.conf.task_default_queue = SERVER_TYPE + '_queue'
celery.conf.task_default_exchange = 'celery:' + SERVER_TYPE
celery.conf.task_default_routing_key = SERVER_TYPE + '.task'
celery.conf.broker_connection_options = {'broker_pool_limit': 0}
celery.conf.broker_transport_options = {'confirm_publish': True}
app.extensions["celery"] = celery
return celery
app = Flask(__name__)
app.config.update(
CELERY=dict(
broker_url=RABBITMQ_URL,
result_backend='rpc://',
task_ignore_result=False,
))
celery_app = create_celery(app)
celery_impl.py
from celery import shared_task
from backend.queue_manager.subscriber_job_types import SUBSCRIBER_JOBS
from queue_manager.start_celery import celery_app
import os
from config import *
class Celery:
def publish(self, *args, **kwargs):
# self.subscriber.delay(task, args, kwargs)
exchange_name = 'celery:' + SERVER_TYPE
routing_key = SERVER_TYPE + '.task'
queue_name = SERVER_TYPE + '_queue'
return self.subscribe.apply_async(args=args, kwargs=kwargs, exchange=exchange_name,
routing_key=routing_key, queue=queue_name)
@shared_task(bind=True)
def subscribe(self, *args, **kwargs):
task = kwargs.get('task')
kwargs.pop('task')
val = SUBSCRIBER_JOBS[task]["worker"](*args, **kwargs)
kwargs['task'] = task
return val
并且在 start_celery.py 中创建的应用程序已导入到正在使用 app.run 的主文件中。 RABBITMQ_URL 根据环境的不同而变化,是的,我检查了这些实例的相关入站和出站规则,它们的所有端口的rabbit mq 容器和代码容器都可以相互访问
尝试了网上各种解决方案,例如 从 .celery 将应用程序导入为 celery_app
全部 = ('celery_app',)
还有其他一些,但都不起作用
检查您的 Flask 服务器是否可以与 ec2 实例中的 celery 工作进行通信。当我使用 docker 并让我的 celery Worker 在
netowork_mode: 'host'
上运行并且我的服务器正在使用容器网络时,我遇到了这个问题。
#compose.yml
server:
build: .
container_name: slice_backend
restart: always
ports:
- "8000:8000"
volumes:
- .:/app
celery:
build: .
container_name: celery_worker
command: celery -A tasks worker
volumes:
- .:/app
depends_on:
- server
- rabbitMQ
environment:
- CELERY_BROKER_URL=pyamqp://guest:guest@rabbitMQ:5672//
- CELERY_RESULT_BACKEND=rpc://
network_mode: host