Celery + SQS - 使用“queue_name_prefix”写入错误的队列

问题描述 投票:0回答:2

我正在尝试使用 SQS 作为代理来设置 Celery 任务。我能够使用以下命令让它工作(最小可重现):

from celery import Celery

app = Celery('tasks', broker=f'sqs://{aws_access}:{aws_secret}@')

@app.task
def test(s):
  print(s)

我运行这个

celery -A tasks worker --loglevel=INFO
,然后通过以下方式从 shell 调用它:

from tasks import test
test.delay('printme')

这个效果很好。问题是,我想在多个环境中运行它,所以我想为每个环境指定一个单独的队列。我找到了

queue_name_prefix
的文档,这似乎是我想要的,但我无法让它工作。

我尝试过的:

首先我添加了一个

config.py
文件:

broker_transport_options = {
  'queue_name_prefix': 'dev-',
  'region': 'us-east-1'
}

并运行它

celery -A tasks worker --loglevel=INFO --config=config

这会在 aws 上创建一个

dev-celery
队列,但是当我尝试
test.delay('printme')
时,它不会执行。

然后我注意到,如果我返回并运行没有

--config
标志的 celery,它会运行我的测试任务。我检查并确认
task_id
匹配,所以看起来,即使我运行 celery 来从 dev-celery 队列中
read
,我仍然 writing
celery
队列。

我也尝试过使用

app.conf.update
来更新代码中的 celery 应用程序,但它似乎不起作用。

如何将预定作业放入

dev-celery
队列中?

celery
2个回答
4
投票

所以我确实做到了这一点,尽管我不知道这是否是最佳方式。问题似乎是任务仍然被发送到默认的“celery”队列,即使工作人员现在正在侦听“dev-celery”队列。这是我最终让它发挥作用的方法:

tasks.py
代码中,我添加了对
conf.update
的调用:

app = Celery('tasks')
app.config.update({
    'broker_url': f'sqs://{aws_access}:{aws_secret}@',
    'task_routes': {
        'test': {'queue': 'dev-celery'}
    }
})

然后在向队列发送任务时,使用

apply_async
方法显式声明要使用的队列:

test.apply_async(
    args=['print test'],
    queue='dev-celery'
)

希望对某人有帮助;)


0
投票

我使用了以下代码作为queue_name_prefix... 它在生产环境中运行的 Flask 应用程序中..

from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        broker="sqs://",
        broker_transport_options={
            "queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
        },
    )
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery
© www.soinside.com 2019 - 2024. All rights reserved.