我一直在尝试调试这个奇怪的错误,如果任务接管了大约 5m,任务就会在 celery 工作线程上失败。它也没有显示有用的错误,只是这样:
2024-11-14T07:25:40Z app[48e501ec74e118] den [info]Traceback (most recent call last):
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 520, in trace_task
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] task.backend.mark_as_done(
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 157, in mark_as_done
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] self.store_result(task_id, result, state, request=request)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 526, in store_result
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] self._store_result(task_id, result, state, traceback,
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 981, in _store_result
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 854, in _set_with_state
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] return self.set(key, value)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 400, in set
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] return self.ensure(self._set, (key, value), **retry_policy)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 384, in ensure
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] return retry_over_time(
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] return fun(*args, **kwargs)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 409, in _set
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] pipe.execute()
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1530, in execute
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] return conn.retry.call_with_retry(
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/retry.py", line 62, in call_with_retry
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] return do()
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1531, in <lambda>
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] lambda: execute(conn, stack, raise_on_error),
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1416, in _execute_transaction
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] self.raise_first_error(commands, response)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1451, in raise_first_error
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] raise r
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1385, in _execute_transaction
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] self.parse_response(connection, "_")
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1462, in parse_response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] result = Redis.parse_response(self, connection, command_name, **options)
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 584, in parse_response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] response = connection.read_response()
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/connection.py", line 616, in read_response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] raise response
2024-11-14T07:25:40Z app[48e501ec74e118] den [info] warn(RuntimeWarning(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info]Traceback (most recent call last):
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/billiard/pool.py", line 1794, in safe_apply_callback
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] fun(*args, **kwargs)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/worker/request.py", line 624, in on_failure
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] self.task.backend.mark_as_failure(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 167, in mark_as_failure
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] self.store_result(task_id, exc, state,
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 526, in store_result
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] self._store_result(task_id, result, state, traceback,
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 981, in _store_result
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/base.py", line 854, in _set_with_state
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] return self.set(key, value)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 400, in set
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] return self.ensure(self._set, (key, value), **retry_policy)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 384, in ensure
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] return retry_over_time(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] return fun(*args, **kwargs)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/celery/backends/redis.py", line 409, in _set
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] pipe.execute()
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1530, in execute
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] return conn.retry.call_with_retry(
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/retry.py", line 62, in call_with_retry
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] return do()
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1531, in <lambda>
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] lambda: execute(conn, stack, raise_on_error),
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1416, in _execute_transaction
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] self.raise_first_error(commands, response)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1451, in raise_first_error
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] raise r
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1385, in _execute_transaction
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] self.parse_response(connection, "_")
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 1462, in parse_response
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] result = Redis.parse_response(self, connection, command_name, **options)
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/client.py", line 584, in parse_response
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] response = connection.read_response()
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] ^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] File "/app/.venv/lib/python3.12/site-packages/redis/connection.py", line 616, in read_response
2024-11-14T07:25:41Z app[48e501ec74e118] den [info] raise response
这是任务:
@tracer.start_as_current_span("transcribe_runpod")
def transcribe_runpod(audio_url: str, primary_language: str) -> dict[str, str]:
log.info(f"Transcribing audio from {audio_url} with primary language {primary_language}")
response = whisper_endpoint.run_sync({
"input": {
"model": "large-v2",
"audio": audio_url,
"word_timestamps": True,
"language": language,
"logprob_threshold": None,
"no_speech_threshold": None,
"condition_on_previous_text": True,
"temperature": 0.0,
"initial_prompt": transliterated_words,
}
})
if not response or "transcription" not in response or "word_timestamps" not in response:
raise RunpodResponseEmptyError()
log.info("Transcribed audio", response=response)
return response["transcription"], response["word_timestamps"]
被称为
@celery.task(bind=True)
def transcribe_runpod_task(self, audio_url: str, primary_language: str):
return transcribe_runpod(audio_url, primary_language)
现在,较短的音频文件效果很好,但这个较大文件的某些问题导致了看起来超时的情况。
此问题出现在 celery[redis] 5.4.0 以及 5.5.0rc1 上。我想知道我是否缺少某种配置?
使用 Redis 后端运行大约 5 分钟或更长时间的长时间 celery 任务时,您会遇到超时问题。由于连接超时或处理长任务持续时间的限制,Redis 后端似乎无法存储结果。
为了解决这个问题,您可以定义一个具有强大重试逻辑的可重用的基本 celery 任务类,然后在任务定义中使用它。您可以执行以下操作
首先,创建一个处理重试的基类,并包含用于记录成功、失败和重试尝试的钩子:
from celery import Task
import logging
log = logging.getLogger(__name__)
class BaseTaskWithRetry(Task):
autoretry_for = (Exception,)
retry_backoff = True
retry_backoff_max = 60
retry_jitter = True
acks_late = True
def on_retry(self, exc, task_id, args, kwargs, einfo):
log.warning(f"Retrying task {task_id} due to {exc}. Args: {args}, Kwargs: {kwargs}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
log.error(f"Task {task_id} failed due to {exc}. Args: {args}, Kwargs: {kwargs}", exc_info=True)
def on_success(self, retval, task_id, args, kwargs):
log.info(f"Task {task_id} succeeded. Return value: {retval}. Args: {args}, Kwargs: {kwargs}")
现在,使用 BaseTaskWithRetry 类来定义你的 celery 任务:
from celery import shared_task
@shared_task(bind=True, base=BaseTaskWithRetry, soft_time_limit=1800, time_limit=1850)
def transcribe_runpod_task(self, audio_url: str, primary_language: str):
"""
Celery task to transcribe audio using Runpod.
Automatically retries on failure.
"""
log.info(f"Starting transcription for {audio_url} with language {primary_language}")
try:
# Call the transcription function
response = transcribe_runpod(audio_url, primary_language)
# Log success using the base class method
self.on_success(response, self.request.id, (audio_url, primary_language), {})
return response
except Exception as exc:
log.error(f"Transcription failed: {exc}", exc_info=True)
raise self.retry(exc=exc)
要触发任务,可以使用以下代码:
transcribe_runpod_task.apply_async(
args=["https://example.com/audio.mp3", "en"],
countdown=20
)