我正在尝试使用 Celery 和 AWS SQS 作为消息代理在 Django Web 应用程序上实现后台任务。我有一个调用任务的视图,该任务已成功发送到 SQS。在我启动 Celery Worker 的控制台窗口中,我收到一条声明,表示已收到任务。然而,实际的任务永远不会被执行(即我在任务中添加的打印语句永远不会被打印)。
我已经在我的
settings.py
- 中配置了 Celery
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TASK_DEFAULT_QUEUE = 'tk-test-queue'
CELERY_BROKER_URL = "sqs://%s:%s@" % (quote(os.environ.get('AWS_ACCESS_KEY_ID'), safe=''), quote(os.environ.get('AWS_SECRET_ACCESS_KEY'), safe=''))
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region': 'ap-south-1',
'visibility-timeout': 60 * 30,
'polling_interval': 1
}
CELERY_RESULT_BACKEND = None
我的
celery.py
看起来像这样-
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings')
app = Celery('my_project')
app.config_from_object('django.conf:settings', namespace="CELERY")
app.autodiscover_tasks()
我在
my_app/tasks.py
- 中定义了一个任务
from time import sleep, ctime
from celery import shared_task
@shared_task
def export_report_to_s3():
print(f'Starting task at {ctime()}')
sleep(25)
print(f'Task finished at {ctime()}')
我在我的
my_app/views.py
- 中使用它
class TestCeleryView(View):
def get(self, request):
print('In view')
export_report_to_s3.delay()
return HttpResponse('Success')
我正在通过运行 -
来启动芹菜工人celery -A my_project worker -l INFO
每次访问视图函数的 URL 时,我都会从视图中收到成功响应,并且 SQS 上的正在进行的请求数量会增加。但我没有得到芹菜工人的输出。这是我第一次使用 Celery 和 SQS。我做错了什么?
我在 Celery 5.3.1 中看到了完全相同的问题。
我必须将 Celery 更新到 >5.3.6 才能获得此修复:
更新 kombu>=5.3.4 以修复 SQS 请求与 boto JSON 序列化程序的兼容性 (#8646)
在此之后,SQS 消息实际上被 Celery 读取并处理了
https://docs.celeryq.dev/en/stable/changelog.html#version-5-3-6