我在Windows上测试了它并且它有效,但现在我想使用docker来完成它。问题是,当我尝试执行任务向用户发送电子邮件时,我收到错误:
[Errno 111] Connection refused
,但 celery 成功启动并连接到rabbitmq。为什么celery无法向rabbitmq发送任务?
Traceback (most recent call last):
File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 32, in __call__
return self.__value__
^^^^^^^^^^^^^^
During handling of the above exception ('ChannelPromise' object has no attribute '__value__'), another exception occurred:
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
yield
^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 459, in _ensure_connection
return retry_over_time(
File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 318, in retry_over_time
return fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 941, in _connection_factory
self._connection = self._establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 867, in _establish_connection
conn = self.transport.establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/transport/pyamqp.py", line 203, in establish_connection
conn.connect()
^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/amqp/connection.py", line 323, in connect
self.transport.connect()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 129, in connect
self._connect(self.host, self.port, self.connect_timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/amqp/transport.py", line 184, in _connect
self.sock.connect(sa)
^^^^^^^^^^^^^^^^^^^^^
The above exception ([Errno 111] Connection refused) was the direct cause of the following exception:
File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/exception.py", line 55, in inner
response = get_response(request)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/django/core/handlers/base.py", line 197, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/website/journal_website/views.py", line 281, in register_new_user
send_email_message_to_user_with_activation_link.delay(new_user.pk, new_user_additional_data.code)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 444, in delay
return self.apply_async(args, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/celery/app/task.py", line 594, in apply_async
return app.send_task(
File "/usr/local/lib/python3.11/dist-packages/celery/app/base.py", line 798, in send_task
amqp.send_task_message(P, name, message, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/celery/app/amqp.py", line 517, in send_task_message
ret = producer.publish(
File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 186, in publish
return _publish(
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 563, in _ensured
return fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 195, in _publish
channel = self.channel
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 218, in _get_channel
channel = self._channel = channel()
^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/messaging.py", line 234, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 960, in default_channel
self._ensure_connection(**conn_opts)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 458, in _ensure_connection
with ctx():
^^^^^
File "/usr/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
docker-compose.yml:
version: "3.0"
services:
# WEB
django:
build: .
command: python3.11 manage.py runserver 0.0.0.0:8000
container_name: django-server
volumes:
- media_volume:/website/journal_website/media
- static_volume:/website/journal_website/static
- database_volume:/website/journal_website/database
ports:
- "8000:8000"
depends_on:
- rabbit
# Celery
celery:
build: .
command: celery -A website worker -l info
container_name: celery
depends_on:
- rabbit
# RabbitMQ
rabbit:
hostname: rabbit
container_name: rabbitmq
image: rabbitmq:3.12-rc-management
ports:
# AMQP protocol port
- "5672:5672"
# HTTP management UI
- "15672:15672"
restart: always
volumes:
media_volume:
static_volume:
database_volume:
celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "website.settings")
celery_application = Celery("website")
celery_application.config_from_object("django.conf:settings", namespace="CELERY")
celery_application.conf.broker_url = "amqp://rabbit:5672"
celery_application.autodiscover_tasks()
任务.py:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
# Some imports...
@shared_task
def send_email_message_to_user_with_activation_link(target_user_id: int, code: UUID) -> HttpResponse | None:
target_user = User.objects.get(pk=target_user_id)
content = {
"email": target_user.email,
"domain": "127.0.0.1:8000",
"site_name": "Website",
"user": target_user,
"protocol": "http",
"code": code,
}
message = render_to_string("user/account_activation/account_activation_email.txt", content)
try:
send_mail("Account activation", message, "[email protected]" , [target_user.email], fail_silently=False)
except BadHeaderError:
return HttpResponse("Invalid header found.")
我尝试了你的 docker-compose,它对我来说效果很好。唯一的问题是rabbitmq需要大约20秒的时间来启动和接受连接。在连接启动之前,您可能会看到连接超时错误。但一旦启动,芹菜工人就会建立连接并正常工作。
对于开发来说,进行更改时最好不要停止rabbitmq容器并重新启动celery容器。但是你可能总是会在rabbitmq启动时遇到超时。
已解决,它可以在带有 WSL 2 的 Linux 上运行。首先,我需要在项目目录中的
__init__.py
文件中添加一些代码:
from .celery import celery_application as celery_app
__all__ = ["celery_app"]
然后我在
docker-compose
文件中为 django 和 celery 容器使用了通用数据库卷:
# Celery worker
celery_worker:
command: celery -A website worker -l info
container_name: celery_worker
volumes:
- database_volume:/website/journal_website/database # just add this to celery worker service.
image: django-image # you need to use common image for django and celery containers that is created from Dockerfile.
depends_on:
- rabbitmq
- django