我正在尝试使用 SQS 作为代理来设置 Celery 任务。我能够使用以下命令让它工作(最小可重现):
from celery import Celery
app = Celery('tasks', broker=f'sqs://{aws_access}:{aws_secret}@')
@app.task
def test(s):
print(s)
我运行这个
celery -A tasks worker --loglevel=INFO
,然后通过以下方式从 shell 调用它:
from tasks import test
test.delay('printme')
这个效果很好。问题是,我想在多个环境中运行它,所以我想为每个环境指定一个单独的队列。我找到了
queue_name_prefix
的文档,这似乎是我想要的,但我无法让它工作。
我尝试过的:
首先我添加了一个
config.py
文件:
broker_transport_options = {
'queue_name_prefix': 'dev-',
'region': 'us-east-1'
}
并运行它
celery -A tasks worker --loglevel=INFO --config=config
这会在 aws 上创建一个
dev-celery
队列,但是当我尝试 test.delay('printme')
时,它不会执行。
然后我注意到,如果我返回并运行没有
--config
标志的 celery,它会运行我的测试任务。我检查并确认 task_id
匹配,所以看起来,即使我运行 celery 来从 dev-celery
队列中 read,我仍然 writing 到
celery
队列。
我也尝试过使用
app.conf.update
来更新代码中的 celery 应用程序,但它似乎不起作用。
如何将预定作业放入
dev-celery
队列中?
所以我确实做到了这一点,尽管我不知道这是否是最佳方式。问题似乎是任务仍然被发送到默认的“celery”队列,即使工作人员现在正在侦听“dev-celery”队列。这是我最终让它发挥作用的方法:
在
tasks.py
代码中,我添加了对conf.update
的调用:
app = Celery('tasks')
app.config.update({
'broker_url': f'sqs://{aws_access}:{aws_secret}@',
'task_routes': {
'test': {'queue': 'dev-celery'}
}
})
然后在向队列发送任务时,使用
apply_async
方法显式声明要使用的队列:
test.apply_async(
args=['print test'],
queue='dev-celery'
)
希望对某人有帮助;)
我使用了以下代码作为queue_name_prefix... 它在生产环境中运行的 Flask 应用程序中..
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker="sqs://",
broker_transport_options={
"queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
},
)
task_base = celery.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery