不同文件中的 Celery + Redis 任务

问题描述 投票:0回答:2

当我从命令行运行 Celery 时,我只能看到与 Celery 对象位于同一文件中的任务,而看不到其他文件中的任务。

项目结构如下:

celery_test
    celery_tasks
        __init__.py
        celery_app.py
        async
            __init__.py
            tasks.py
        marker
            __init__.py
            tasks.py

文件内容如下

celery_app.py

from __future__ import absolute_import
from celery import Celery

celery_application = Celery('celery_test', backend='redis://localhost', broker='redis://localhost')

@celery_application.task
def test_celery():
    print 4

任何

tasks.py
文件都有类似这样的内容

async/tasks.py

from __future__ import absolute_import
import time

from celery_tasks.celery_app import celery_application


@celery_application.task
def async_test():
    print 'Start async_test'
    time.sleep(3)
    print 'Finish async_test'

当我按如下方式运行 Celery 时

celery --app=celery_tasks.celery_app:celery_application worker -l debug

我得到以下信息

 -------------- celery@LAPTOP-HCR4G00Q v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.16299
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         celery_test:0x6ff3f28
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . celery_tasks.celery_app.test_celery

这只是与应用程序位于同一文件中的任务。

对于如何解决这个问题有什么建议吗?我确实需要按主题分隔任务,因为它们很多,因此它们位于单个文件中。

python celery
2个回答
14
投票

我花了很多时间写这个问题,我刚刚解决了它,所以我分享解决方案,因为关于它的信息不多(或者至少我没有找到它)。

定义 Celery 对象后,我尝试自动发现任务,但没有成功。我的最后一次尝试是更改应用程序的名称并通过以下方式强制检测:

celery_application.autodiscover_tasks(['celery_tasks.async', 'celery_tasks.marker'], force=True)

然后从

celery_test/
运行:

celery --app=celery_tasks.celery_app:celery_application worker -l info

这解决了我的问题。我希望这对你有帮助


0
投票

我用celery做了一个项目,最重要的是你要注意文件结构。

enter image description here

from celery import Celery

celery_app = Celery(
'worker',
broker='redis://redis:6379/0',
backend='redis://redis:6379/0')

celery_app.autodiscover_tasks(['tasks.uploads'], force=True)

import logging
from fastapi import Depends
from config.celery.celery_config import celery_app
from service.s3.s3_config import S3Config
from service.s3.s3_service import S3Service


@celery_app.task
def upload_file_to_s3(file_content: bytes, filename: str,content_type: 
str):
"""Upload a file to S3 using the S3Service.

Args:
    file_content (bytes): The content of the file to be uploaded.
    filename (str): The name of the file to be uploaded.

Returns:
    str: The path of the uploaded file.
    :param filename:
    :param content_type:
"""
s3_config = S3Config()
s3_service = S3Service(s3_config)
try:
    uploaded_file_path = s3_service.upload_file(filename, 
file_content,content_type)
    return uploaded_file_path
except Exception as e:
    logging.error(f"Error during file upload: {e}")
    raise

此外,您必须在 init.py 文件中添加该行

from tasks.uploads.upload_tasks import upload_file_to_s3

所以基本上有两件事非常重要。如果您在不同的文件中编写 celery 任务,则必须添加 init.py 并添加该导入,以便 celery 配置可以看到该文件。

© www.soinside.com 2019 - 2024. All rights reserved.