我已经启动了我的 celery Worker,它使用 RabbitMQ 作为代理,如下所示:
celery -A my_app worker -l info -P gevent -c 100 --prefetch-multiplier=1 -Q my_app
然后我的任务看起来很像这样:
@shared_task(queue='my_app', default_retry_delay=10, max_retries=1, time_limit=8 * 60)
def example_task():
# getting queryset with some filtering
my_models = MyModel.objects.filter(...)
for my_model in my_models.iterator():
my_model.execute_something()
有时此任务可以在不到一分钟的时间内完成,有时在高负载期间,需要超过 5 分钟才能完成。
主要问题是 RabbitMQ 不断地将我的工作人员从消费者列表中删除。看起来真的很随机。因此,我需要再次重新启动工作程序。
工作人员也开始抛出这些错误:
SSLEOFError: EOF occurred in violation of protocol (_ssl.c:2396)
有时会出现这些错误:
consumer: Cannot connect to amqps://my_app:**@example.com:5671/prod: SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:997)').
Couldn't ack 2057, reason:"RecoverableConnectionError(None, 'connection already closed', None, '')"
我尝试添加
--without-heartbeat
但没有任何作用。
如何解决这个问题?有时我的任务需要 30 多分钟才能完成,而且我无法持续监控工人是否被从rabbitmq踢出。
对于将来遇到这些问题的任何人来说,这似乎是 RabbitMQ 的问题。我尝试了 Redis,问题就消失了。不过,我想坚持使用 RabbitMQ 进行生产,所以我尝试了 LavinMQ,这是一个替代品,问题就消失了。