我尝试通过设置并直接从 celery 文件配置代理。 以下适用于 celery 的设置。
AWS_SQS_SECRET = os.environ.get("AWS_SQS_SECRET")
broker_url = 'sqs://%s:%s@' % (AWS_SQS_ACCESS, AWS_SQS_SECRET)
task_default_queue = os.environ.get("DEFAULT_QUEUE")
AWS_SQS_REGION = os.environ.get("AWS_REGION")
broker_backend = "SQS"
broker_transport_options = {
"region": AWS_SQS_REGION,
# 'queue_name_prefix': '%s-' % 'dev' , # os.environ.get('ENVIRONMENT', 'development'),
'visibility_timeout': 7200,
'polling_interval': 1,
}
accept_content = ['application/json']
result_serializer = 'json'
task_serializer = 'json'
另外,正如我提到的,我尝试直接从 celery 文件进行配置。
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyApp.settings')
AWS_SQS_ACCESS = os.environ.get("AWS_SQS_ACCESS")
AWS_SQS_SECRET = os.environ.get("AWS_SQS_SECRET")
app = Celery('MyApp') #,, broker='sqs://%s:%s@' % (AWS_SQS_ACCESS, AWS_SQS_SECRET), backend='django-db'
# app.config_from_object('django.conf:settings') #, namespace='CELERY'
CELERY_CONFIG = {
"CELERY_TASK_SERIALIZER": "json",
"CELERY_ACCEPT_CONTENT": ["json"],
"CELERY_RESULT_SERIALIZER": "json",
"CELERY_RESULT_BACKEND": None,
"CELERY_TIMEZONE": "America/Sao_Paulo",
"CELERY_ENABLE_UTC": True,
"CELERY_ENABLE_REMOTE_CONTROL": False,
}
BROKER_URL = 'sqs://%s:%s@' % (AWS_SQS_ACCESS, AWS_SQS_SECRET)
CELERY_CONFIG.update(
**{
"BROKER_URL": BROKER_URL,
"BROKER_TRANSPORT": "sqs",
"BROKER_TRANSPORT_OPTIONS": {
"region": "sa-east-1",
"visibility_timeout": 3600,
"polling_interval": 60,
},
}
)
app.conf.update(**CELERY_CONFIG)
app.autodiscover_tasks()
在 elastik beanstalk 上部署期间,在服务中我运行命令:
$PYTHONPATH/celery -A celery worker -Q default-dev -n default-worker \
--logfile=/var/log/celery/celery-stdout-error.log --loglevel=DEBUG --concurrency=1
之前尝试过跑步:
$PYTHONPATH/celery -A MyApp worker -Q default-dev -n default-worker \
--logfile=/var/log/celery/celery-stdout-error.log --loglevel=DEBUG --concurrency=1
但是出现错误,celery“无法加载应用程序 MyApp”。
在日志文件中我收到以下错误:
[2022-06-10 15:58:25,678: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 6.00 seconds... (3/100)
我的celery版本是5.2.7
如果您仍然遇到此问题,则需要将
<celery_instance>.conf.broker_transport
设置为您正在使用的任何传输方式,在我的例子中,我想使用 redis,因此对于我的用例,我必须执行以下操作:
celery.conf.broker_transport = 'redis' # if you're using 'sqs' change to `sqs`, and so on and so forth.
基本上他们添加了一堆选项,你现在需要设置,我不知道为什么,但从文档中看它不是很明显(我认为它还没有更新 - 但我可能是错的)。
'broker_host': None,
'broker_login_method': None,
'broker_password': '',
'broker_pool_limit': 10,
'broker_port': None,
'broker_read_url': None,
'broker_transport': ''
如果未设置,则默认为它们所具有的预设,在传输情况下为“amqp”。我还必须设置
broker_password
,尽管我已经将其与 url 方案中的传输一起添加了 tranport://:pass@host:port
但是,似乎 broker_url
不再包含这些内容,您需要单独设置它们。它可能允许更多的控制,但我可能是错的。
如果您遇到同样的问题,只需尝试重新启动 celery 服务即可。