如何将 Celery 配置为可靠的发送队列

问题描述 投票:0回答:0

我正在尝试使用 Celery 来排队 HTTP 请求以发送到另一个服务。最重要的是没有数据丢失,所以每个请求至少发送一次。另外我想知道为什么任务即使重试也会失败。

我在这方面遇到过多次失败,所以我目前的解决方案是使用广泛的

autoretry_for
retry_backoff
acks_late

class SendTask(celery.Task):
    """Celery task to send data."""

    name = "app_name.send_data"
    autoretry_for = (Exception,)
    retry_backoff = 2  # This is the initial retry delay in seconds.
    retry_backoff_max = 24 * 3600  # This is the maximum amount of time between retries.
    max_retries = None  # Never stop trying.

    # Risk sending one event more than once rather than losing it when Celery fails.
    acks_late = True
    reject_on_worker_lost = True

    def __init__(self, client):
        self.client = client  # Wrapper around the requests library

    def run(self, data, *args, **kwargs):  # pylint: disable=arguments-differ
        """Send data using a post request."""
        data.setdefault("date", datetime.utcnow().isoformat())
        self.client.post("endpoint", json=data)  # throws an exception on any failure or non 2xx HTTP response.

全局配置只是broker:

CELERY = {
    "broker_url": "redis://localhost:6379/0",
}
  1. 即使重试成功,任务也会不断重试,导致大量重复。 → 没有日志消息。也没有可靠的方法来监控队列。
  2. 重启工作人员会导致任务丢失:虽然我在日志中收到消息说
    Restoring xxx unacknowledged message(s)
    ,但工作人员在重启后从未开始重试。数据丢失。

这是 Celery 能够且适合处理的事情吗?如果是这样,我的配置/对它应该如何工作的期望有什么明显的错误吗?

python celery
© www.soinside.com 2019 - 2024. All rights reserved.