django应用程序中使用rabbitmqp作为代理和celery运行简单任务时出现的问题

问题描述 投票:0回答:1

我正在尝试学习如何设置 django 应用程序来使用 celery。我使用以下命令在 Windows 计算机上的 docker 桌面上运行rabbitmq:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq

然后我用这个命令运行 celery 并用flower监视它:

celery -A a_shop worker -l Info   
celery -A a_shop flower

在 django 应用程序的主初始化文件中我有这个:

from .celery import app as celery_app

__all__ = ('celery_app',)

对于

setting.py
文件中的芹菜设置,我有以下内容:

CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_TASK_SOFT_TIME_LIMIT = 600  # 10 minutes
CELERY_TASK_TIME_LIMIT = 1200
CELERY_IMPORTS = ('orders.task',)
CELERY_TASK_DEFAULT_QUEUE = 'celery'

我在主应用程序中创建了一个名为 celery.py 的文件,内容如下:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'a_shop.settings')

app = Celery('a_shop')

app.config_from_object('django.conf:settings', namespace='CELERY')


app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {self.request!r}')

我创建了一个简单的任务来测试:

import logging
from celery import shared_task
from .models import Order
logger = logging.getLogger(__name__)


@shared_task
def order_created(order_id):
    logger.info(f"Task order_created started with order_id: {order_id}")
    try:
        order = Order.objects.get(id=order_id)
        logger.info(f"Processing order: {order.name}")
        print(order_id)
    except Exception as e:
        logger.error(f"Error processing order {order_id}: {e}")
        raise
    logger.info(f"Task order_created completed with order_id: {order_id}")
    return order_id

我在我的视图之一中执行任务:

order_created.delay(new_order.id)

我在 celery 控制台中看到收到的任务:

[2024-06-20 15:59:43,220: INFO/MainProcess] Task orders.task.order_created[dbbcafd1-55f8-4e86-ae48-1c68fda5c75f] received
[2024-06-20 15:59:44,289: INFO/SpawnPoolWorker-9] child process 10608 calling self.run()
[2024-06-20 15:59:44,291: INFO/SpawnPoolWorker-10] child process 744 calling self.run()

但过了一会儿我在控制台上收到此错误:

[2024-06-20 05:01:48,262: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery
acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers d
oc guide to learn more', (0, 0), '')
Traceback (most recent call last):
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 628, in start
c.loop(*c.loop_args())
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\loops.py", line 130, in synloop
connection.drain_events(timeout=2.0)
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\transport\pyamqp.py", line 171, in drain_events
return connection.drain_events(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 532, in blocking_read
return self.on_inbound_frame(frame)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 538, in on_inbound_method
return self.channels[channel_id].dispatch_method(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_method
listener(*args)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\channel.py", line 293, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Time
out value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

问题出在哪里以及如何解决?

python django rabbitmq celery
1个回答
0
投票

我的假设是数据库调用和提交在您的情况下不会发生。

尝试使用delay_on_commit

- order_created.delay(new_order.id)
+ order_created.delay_on_commit(new_order.id)

您可以查看文档以获取更多信息

https://docs.celeryq.dev/en/latest/django/first-steps-with-django.html#using-the-shared-task-decorator

© www.soinside.com 2019 - 2024. All rights reserved.