Celery + Redis 奇怪的连接错误,任务需要 5 分钟?

问题描述 投票:0回答:1

我一直在尝试调试这个奇怪的错误,如果任务接管了大约 5m,任务就会在 celery 工作线程上失败。它也没有显示有用的错误,只是这样:

2024-11-14T07:25:40Z app[48e501ec74e118] den [info]Traceback (most recent call last):
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 520, in trace_task
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    task.backend.mark_as_done(
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 157, in mark_as_done
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    self.store_result(task_id, result, state, request=request)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 526, in store_result
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    self._store_result(task_id, result, state, traceback,
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 981, in _store_result
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 854, in _set_with_state
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    return self.set(key, value)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 400, in set
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    return self.ensure(self._set, (key, value), **retry_policy)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 384, in ensure
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    return retry_over_time(
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    return fun(*args, **kwargs)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 409, in _set
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    pipe.execute()
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1530, in execute
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    return conn.retry.call_with_retry(
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/retry.py", line 62, in call_with_retry
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    return do()
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]           ^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1531, in <lambda>
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    lambda: execute(conn, stack, raise_on_error),
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1416, in _execute_transaction
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    self.raise_first_error(commands, response)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1451, in raise_first_error
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    raise r
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1385, in _execute_transaction
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    self.parse_response(connection, "_")
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1462, in parse_response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    result = Redis.parse_response(self, connection, command_name, **options)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 584, in parse_response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    response = connection.read_response()
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]               ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/connection.py", line 616, in read_response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]    raise response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]  warn(RuntimeWarning(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]Traceback (most recent call last):
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/billiard/pool.py", line 1794, in safe_apply_callback
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    fun(*args, **kwargs)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/worker/request.py", line 624, in on_failure
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    self.task.backend.mark_as_failure(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 167, in mark_as_failure
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    self.store_result(task_id, exc, state,
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 526, in store_result
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    self._store_result(task_id, result, state, traceback,
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 981, in _store_result
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 854, in _set_with_state
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    return self.set(key, value)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 400, in set
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    return self.ensure(self._set, (key, value), **retry_policy)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 384, in ensure
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    return retry_over_time(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    return fun(*args, **kwargs)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 409, in _set
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    pipe.execute()
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1530, in execute
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    return conn.retry.call_with_retry(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/retry.py", line 62, in call_with_retry
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    return do()
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]           ^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1531, in <lambda>
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    lambda: execute(conn, stack, raise_on_error),
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1416, in _execute_transaction
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    self.raise_first_error(commands, response)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1451, in raise_first_error
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    raise r
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1385, in _execute_transaction
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    self.parse_response(connection, "_")
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1462, in parse_response
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    result = Redis.parse_response(self, connection, command_name, **options)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 584, in parse_response
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    response = connection.read_response()
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]               ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]  File "/app/.venv/lib/python3.12/site-packages/redis/connection.py", line 616, in read_response
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]    raise response

这是任务:

@tracer.start_as_current_span("transcribe_runpod")
def transcribe_runpod(audio_url: str, primary_language: str) -> dict[str, str]:
    log.info(f"Transcribing audio from {audio_url} with primary language {primary_language}")

    response = whisper_endpoint.run_sync({
        "input": {
            "model": "large-v2",
            "audio": audio_url,
            "word_timestamps": True,
            "language": language,
            "logprob_threshold": None,
            "no_speech_threshold": None,
            "condition_on_previous_text": True,
            "temperature": 0.0,
            "initial_prompt": transliterated_words,
        }
    })

    if not response or "transcription" not in response or "word_timestamps" not in response:
        raise RunpodResponseEmptyError()

    log.info("Transcribed audio", response=response)
    return response["transcription"], response["word_timestamps"]

被称为

@celery.task(bind=True)
def transcribe_runpod_task(self, audio_url: str, primary_language: str):
    return transcribe_runpod(audio_url, primary_language)

现在,较短的音频文件效果很好,但这个较大文件的某些问题导致了看起来超时的情况。

此问题出现在 celery[redis] 5.4.0 以及 5.5.0rc1 上。我想知道我是否缺少某种配置?

python redis celery
1个回答
0
投票

使用 Redis 后端运行大约 5 分钟或更长时间的长时间 celery 任务时,您会遇到超时问题。由于连接超时或处理长任务持续时间的限制,Redis 后端似乎无法存储结果。

为了解决这个问题,您可以定义一个具有强大重试逻辑的可重用的基本 celery 任务类,然后在任务定义中使用它。您可以执行以下操作

首先,创建一个处理重试的基类,并包含用于记录成功、失败和重试尝试的钩子:

from celery import Task
import logging

log = logging.getLogger(__name__)

class BaseTaskWithRetry(Task):
    autoretry_for = (Exception,)
    retry_backoff = True
    retry_backoff_max = 60
    retry_jitter = True
    acks_late = True

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        log.warning(f"Retrying task {task_id} due to {exc}. Args: {args}, Kwargs: {kwargs}")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        log.error(f"Task {task_id} failed due to {exc}. Args: {args}, Kwargs: {kwargs}", exc_info=True)

    def on_success(self, retval, task_id, args, kwargs):
        log.info(f"Task {task_id} succeeded. Return value: {retval}. Args: {args}, Kwargs: {kwargs}")

现在,使用 BaseTaskWithRetry 类来定义你的 celery 任务:

from celery import shared_task

@shared_task(bind=True, base=BaseTaskWithRetry, soft_time_limit=1800, time_limit=1850)
def transcribe_runpod_task(self, audio_url: str, primary_language: str):
    """
    Celery task to transcribe audio using Runpod.
    Automatically retries on failure.
    """
    log.info(f"Starting transcription for {audio_url} with language {primary_language}")
    try:
        # Call the transcription function
        response = transcribe_runpod(audio_url, primary_language)
        # Log success using the base class method
        self.on_success(response, self.request.id, (audio_url, primary_language), {})
        return response
    except Exception as exc:
        log.error(f"Transcription failed: {exc}", exc_info=True)
        raise self.retry(exc=exc)

要触发任务,可以使用以下代码:

transcribe_runpod_task.apply_async(
    args=["https://example.com/audio.mp3", "en"],
    countdown=20
)
© www.soinside.com 2019 - 2024. All rights reserved.