当我从命令行运行 Celery 时,我只能看到与 Celery 对象位于同一文件中的任务,而看不到其他文件中的任务。
项目结构如下:
celery_test
celery_tasks
__init__.py
celery_app.py
async
__init__.py
tasks.py
marker
__init__.py
tasks.py
文件内容如下
celery_app.py
from __future__ import absolute_import
from celery import Celery
celery_application = Celery('celery_test', backend='redis://localhost', broker='redis://localhost')
@celery_application.task
def test_celery():
print 4
任何
tasks.py
文件都有类似这样的内容
async/tasks.py
from __future__ import absolute_import
import time
from celery_tasks.celery_app import celery_application
@celery_application.task
def async_test():
print 'Start async_test'
time.sleep(3)
print 'Finish async_test'
当我按如下方式运行 Celery 时
celery --app=celery_tasks.celery_app:celery_application worker -l debug
我得到以下信息
-------------- celery@LAPTOP-HCR4G00Q v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.16299
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_test:0x6ff3f28
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. celery_tasks.celery_app.test_celery
这只是与应用程序位于同一文件中的任务。
对于如何解决这个问题有什么建议吗?我确实需要按主题分隔任务,因为它们很多,因此它们位于单个文件中。
我花了很多时间写这个问题,我刚刚解决了它,所以我分享解决方案,因为关于它的信息不多(或者至少我没有找到它)。
定义 Celery 对象后,我尝试自动发现任务,但没有成功。我的最后一次尝试是更改应用程序的名称并通过以下方式强制检测:
celery_application.autodiscover_tasks(['celery_tasks.async', 'celery_tasks.marker'], force=True)
然后从
celery_test/
运行:
celery --app=celery_tasks.celery_app:celery_application worker -l info
这解决了我的问题。我希望这对你有帮助
我用celery做了一个项目,最重要的是你要注意文件结构。
from celery import Celery
celery_app = Celery(
'worker',
broker='redis://redis:6379/0',
backend='redis://redis:6379/0')
celery_app.autodiscover_tasks(['tasks.uploads'], force=True)
import logging
from fastapi import Depends
from config.celery.celery_config import celery_app
from service.s3.s3_config import S3Config
from service.s3.s3_service import S3Service
@celery_app.task
def upload_file_to_s3(file_content: bytes, filename: str,content_type:
str):
"""Upload a file to S3 using the S3Service.
Args:
file_content (bytes): The content of the file to be uploaded.
filename (str): The name of the file to be uploaded.
Returns:
str: The path of the uploaded file.
:param filename:
:param content_type:
"""
s3_config = S3Config()
s3_service = S3Service(s3_config)
try:
uploaded_file_path = s3_service.upload_file(filename,
file_content,content_type)
return uploaded_file_path
except Exception as e:
logging.error(f"Error during file upload: {e}")
raise
此外,您必须在 init.py 文件中添加该行
from tasks.uploads.upload_tasks import upload_file_to_s3
所以基本上有两件事非常重要。如果您在不同的文件中编写 celery 任务,则必须添加 init.py 并添加该导入,以便 celery 配置可以看到该文件。