我正在尝试使用 Celery 来排队 HTTP 请求以发送到另一个服务。最重要的是没有数据丢失,所以每个请求至少发送一次。另外我想知道为什么任务即使重试也会失败。
我在这方面遇到过多次失败,所以我目前的解决方案是使用广泛的
autoretry_for
和 retry_backoff
和 acks_late
:
class SendTask(celery.Task):
"""Celery task to send data."""
name = "app_name.send_data"
autoretry_for = (Exception,)
retry_backoff = 2 # This is the initial retry delay in seconds.
retry_backoff_max = 24 * 3600 # This is the maximum amount of time between retries.
max_retries = None # Never stop trying.
# Risk sending one event more than once rather than losing it when Celery fails.
acks_late = True
reject_on_worker_lost = True
def __init__(self, client):
self.client = client # Wrapper around the requests library
def run(self, data, *args, **kwargs): # pylint: disable=arguments-differ
"""Send data using a post request."""
data.setdefault("date", datetime.utcnow().isoformat())
self.client.post("endpoint", json=data) # throws an exception on any failure or non 2xx HTTP response.
全局配置只是broker:
CELERY = {
"broker_url": "redis://localhost:6379/0",
}
Restoring xxx unacknowledged message(s)
,但工作人员在重启后从未开始重试。数据丢失。这是 Celery 能够且适合处理的事情吗?如果是这样,我的配置/对它应该如何工作的期望有什么明显的错误吗?