使用 RabbitMQ 设置 Celery

问题描述 投票:0回答:0

我正在尝试使用 RabbitMQ (3.8.19) (Django 4.1.5) 设置 Celery (5.2.7) 结果,该任务以“Unacked”状态出现在“celery”队列中,并在 30 分钟后进入“Ready”状态并显示一条消息。 “amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - 通道 1 上的传递确认超时。使用的超时值:1800000 毫秒。可以配置此超时值,请参阅消费者文档指南以了解更多信息”

直接调用方法时(没有.delay),一切正常。

我不知道什么配置不正确

运行芹菜: $ celery -A mylocal worker -l 信息

 -------------- celery@DEV2 v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-1
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         mylocal:0x18205ade830
- ** ---------- .> transport:   amqp://<RMQLogin>:**@<RMQ_HOST>:<RMQ_PORT>/<RMQ_VIRTUAL_HOST>
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . myapp.tasks.send_spam_email

项目结构:

mylocal
------|
------mylocal
------------|
------------- __init__.py
------------- celery.py
------------- settings.py
------myapp
----------|
---------- templates
--------------|
---------------myapp
-----------------|
--------------------celery.html
---------- admin.py
---------- forms.py
---------- models.py
---------- service.py
---------- tasks.py
---------- urls.py
---------- views.py
**__init__.py**

from .celery import app as celery_app

__all__ = ('celery_app', )


**celery.py**

import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mylocal.settings')

app = Celery('mylocal')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()    


**settings.py**

RMQ_HOST = '10.10.10.10'
RMQ_PORT = '5672'
RMQ_VIRTUAL_HOST = 'VHost'
RMQ_LOGIN = 'RMQLogin'
RMQ_PASSWORD = 'RMQPassword'
CELERY_BROKER_URL = 'amqp://' + RMQ_LOGIN + ':' + RMQ_PASSWORD + '@' + RMQ_HOST + ':' + RMQ_PORT + '/' + RMQ_VIRTUAL_HOST


**admin.py**
from .models import Contact
from django.contrib import admin

@admin.register(Contact)
class ContactAdmin(admin.ModelAdmin):
    list_display = ('name', 'email')


**forms.py**
from django import forms
from .models import Contact
class ContactForm(forms.ModelForm):
    class Meta:
        model = Contact
        fields = '__all__'
        
        
**models.py**
from django.db import models
class Contact(models.Model):
    name = models.CharField(max_length=30)
    email = models.CharField(max_length=50)

    def __str__(self):
        return self.name


**service.py**
from django.core.mail import send_mail


def send(user_email):
    send_mail(
        'Hello',
        'Body goes here',
        '[email protected]',
        [user_email, ],
        fail_silently=False,
    )



**tasks.py**
from mylocal.celery import app
from .service import send

@app.task
def send_spam_email(user_email):
    send(user_email)


**urls.py**
from .views import *
from django.urls import path


app_name = 'myapp'

urlpatterns = [
    path('celery/', ContactView.as_view(), name='celery'),

]



**views.py**

from django.urls import reverse_lazy
from django.views.generic import CreateView

from .forms import ContactForm
from .models import Contact
from .tasks import send_spam_email


class ContactView(CreateView):
    model = Contact
    form_class = ContactForm
    success_url = reverse_lazy('myapp:celery')
    template_name = 'myapp/celery.html'

    def form_valid(self, form):
        form.save()
        # send(form.instance.email)
        send_spam_email.delay(form.instance.email)
        return super().form_valid(form)


**celery.html**

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<form action="{% url 'myapp:celery' %}" method="post">
  {% csrf_token %}
  {{form.as_p}}
  <button type="submit">Send</button>
</form>
</body>
</html>

2023-03-09

尝试使用具有相同效果的配置: 添加到 settings.py:

CELERY_RESULT_BACKEND = 'rpc://'


$ celery -A mylocal worker -l info -E

-------------- celery@DEV2 v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-1
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         mylocal:0x1e64be5e800
- ** ---------- .> transport:   amqp://<RMQLogin>:**@<RMQ_HOST>:<RMQ_PORT>/<RMQ_VIRTUAL_HOST>
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: ON
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

....
[2023-03-09 12:09:43,316: INFO/MainProcess] Task myapp.tasks.send_spam_email[5cebb2d5-5d19-44e5-b638-f4f86b201df4] received
[2023-03-09 12:09:43,317: INFO/MainProcess] Task myapp.tasks.send_spam_email[f8d7243b-d75e-46c8-b733-4ec080500e60] received
[2023-03-09 12:09:43,318: INFO/MainProcess] Task myapp.tasks.send_spam_email[03ce1b24-4d97-41c5-8765-cd71850d452f] received
python django rabbitmq celery
© www.soinside.com 2019 - 2024. All rights reserved.