我正在运行 Redis、Celery 4.0 和 Django 1.10,但从 shell 运行任务“test”时收到 [Errrno 61] 连接拒绝错误。这是我的项目结构:
myproj
│
├── app1
├── __init__.py
├── tasks.py
myproj
├── __init__.py
├── urls.py
├── settings
│ ├── __init__.py
│ ├── base.py
│ ├── local.py
├── local
│ ├── __init__.py
│ ├── celery.py
│ ├── wsgi.py
myproj/app1/tasks.py:
from __future__ import absolute_import
from celery import task
@task(name='app1.tasks.test')
def test():
print('this is a test')
myproj/myproj/local/celery.py:
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings.local')
app = Celery('myproj_local')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()
myproj/myproj/local/
__init__.py
:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
我认为这个 init 文件中有问题,因为当我将内容移动到 myproj/myproj/
__init__.py
: 时任务从 shell 运行
from __future__ import absolute_import, unicode_literals
from .local.celery import app as celery_app
__all__ = ['celery_app']
Celery 在 myproj 目录中运行,命令为:
celery -A myproj.local.celery worker -l info
完整错误:
python manage.py shell --settings=myproj.settings.local
(InteractiveConsole)
>>> from app1.tasks import test
>>> test.delay()
Traceback (most recent call last):
File "<console>", line 1, in <module>
File env/lib/python2.7/site-packages/celery/app/task.py", line 413, in delay
return self.apply_async(args, kwargs)
File env/lib/python2.7/site-packages/celery/app/task.py", line 536, in apply_async
**options
File env/lib/python2.7/site-packages/celery/app/base.py", line 717, in send_task
amqp.send_task_message(P, name, message, **options)
File env/lib/python2.7/site-packages/celery/app/amqp.py", line 554, in send_task_message
**properties
File env/lib/python2.7/site-packages/kombu/messaging.py", line 178, in publish
exchange_name, declare,
File env/lib/python2.7/site-packages/kombu/connection.py", line 527, in _ensured
errback and errback(exc, 0)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/contextlib.py", line 35, in __exit__
self.gen.throw(type, value, traceback)
File env/lib/python2.7/site-packages/kombu/connection.py", line 419, in _reraise_as_library_errors
sys.exc_info()[2])
File env/lib/python2.7/site-packages/kombu/connection.py", line 414, in _reraise_as_library_errors
yield
File env/lib/python2.7/site-packages/kombu/connection.py", line 515, in _ensured
reraise_as_library_errors=False,
File env/lib/python2.7/site-packages/kombu/connection.py", line 405, in ensure_connection
callback)
File env/lib/python2.7/site-packages/kombu/utils/functional.py", line 333, in retry_over_time
return fun(*args, **kwargs)
File env/lib/python2.7/site-packages/kombu/connection.py", line 261, in connect
return self.connection
File env/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
self._connection = self._establish_connection()
File env/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
conn = self.transport.establish_connection()
File env/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
conn.connect()
File env/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
self.transport.connect()
File env/lib/python2.7/site-packages/amqp/transport.py", line 103, in connect
self._connect(self.host, self.port, self.connect_timeout)
File env/lib/python2.7/site-packages/amqp/transport.py", line 144, in _connect
self.sock.connect(sa)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
OperationalError: [Errno 61] Connection refused
是啊!我得到了它!我有同样的问题。 在文件 myproj/app1/tasks.py 中:
from __future__ import absolute_import
from celery import task
app = Celery('myproj_local')
app.config_from_object('django.conf:settings')
@app.task(name='random_name')
def test():
print('this is a test')
然后,celery 将在“app”中初始化,配置将在“app”中加载,您的任务将被延迟。
但只有当你的配置有效时它才会起作用。
您需要将 BROKER_URL 设置为指向 REDIS。
如果您只有一个队列,请确保您的工作线程已连接到默认队列。
如果您在设置中指定了默认队列,则需要将您的工作人员设置为拾取任务,如下所示:
celery -A myproj.local.celery worker -l info -Q queue_name
我遇到了同样的问题,当我调试时,我可以看到以下代码:
from my_app.tasks import my_task
my_task.delay()
提出异常:
[Errno 61] Connection refused
这是因为我的 celery 配置没有被执行,并且正在为 celery 设置默认配置,它尝试使用
Connection.ensure._ensured
客户端:
<Connection: amqp://guest:**@127.0.0.1:5672// at 0x16c5dfd50>
我试图配置 Redis,并期望
my_project/celery.py
已被执行,但事实并非如此。
修复方法是在 Django 项目 init 文件中加载 celery 配置。这是正确加载 celery 配置的
my_project/__init__.py
内容:
from .celery import app as celery_app
__all__ = ('celery_app',)
添加此内容后,调用
delpoy
方法不再引发异常,并且作业会正确排队。