Django + Celery + SQS - celery 工作人员收到但未执行的消息?

问题描述 投票:0回答:1

我正在尝试使用 Celery 和 AWS SQS 作为消息代理在 Django Web 应用程序上实现后台任务。我有一个调用任务的视图,该任务已成功发送到 SQS。在我启动 Celery Worker 的控制台窗口中,我收到一条声明,表示已收到任务。然而,实际的任务永远不会被执行(即我在任务中添加的打印语句永远不会被打印)。

我已经在我的

settings.py
-

中配置了 Celery
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TASK_DEFAULT_QUEUE = 'tk-test-queue'
CELERY_BROKER_URL = "sqs://%s:%s@" % (quote(os.environ.get('AWS_ACCESS_KEY_ID'), safe=''), quote(os.environ.get('AWS_SECRET_ACCESS_KEY'), safe=''))
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region': 'ap-south-1',
    'visibility-timeout': 60 * 30,
    'polling_interval': 1
}
CELERY_RESULT_BACKEND = None

我的

celery.py
看起来像这样-

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings')

app = Celery('my_project')
app.config_from_object('django.conf:settings', namespace="CELERY")
app.autodiscover_tasks()

我在

my_app/tasks.py
-

中定义了一个任务
from time import sleep, ctime

from celery import shared_task


@shared_task
def export_report_to_s3():
    print(f'Starting task at {ctime()}')
    sleep(25)
    print(f'Task finished at {ctime()}')

我在我的

my_app/views.py
-

中使用它
class TestCeleryView(View):
    def get(self, request):
        print('In view')
        export_report_to_s3.delay()
        return HttpResponse('Success')

我正在通过运行 -

来启动芹菜工人
celery -A my_project worker -l INFO

每次访问视图函数的 URL 时,我都会从视图中收到成功响应,并且 SQS 上的正在进行的请求数量会增加。但我没有得到芹菜工人的输出。这是我第一次使用 Celery 和 SQS。我做错了什么?

django celery amazon-sqs
1个回答
0
投票

我在 Celery 5.3.1 中看到了完全相同的问题。

我必须将 Celery 更新到 >5.3.6 才能获得此修复:

更新 kombu>=5.3.4 以修复 SQS 请求与 boto JSON 序列化程序的兼容性 (#8646)

在此之后,SQS 消息实际上被 Celery 读取并处理了

https://docs.celeryq.dev/en/stable/changelog.html#version-5-3-6

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.