我有一个像这样的芹菜任务:
from celery import Celery
from asgiref.sync import async_to_sync
from celery_app.celery_commands import some_async_command
import os
redis_connection_string = os.environ.get("REDIS_URL")
celery_app = Celery(
'tasks', backend=f"{redis_connection_string}/0", broker=f"{redis_connection_string}/1")
@celery_app.task()
def sample_task(**kwargs):
result = async_to_sync(some_async_command)(**kwargs)
return result
当我使用 run celery 来执行此任务时:
celery -A celery_app.tasks worker --loglevel=INFO -n my_app_worker --concurrency=4
,它工作正常。然而,由于异步函数 some_async_command
主要是 I/O 绑定并调用网络,所以我正在考虑使用 gevent 池。我已经安装了gevent,但是当我运行celery -A celery_app.tasks worker --loglevel=INFO -n my_app_worker --concurrency=4 --pool gevent
时,它抛出错误:
Usage: celery [OPTIONS] COMMAND [ARGS]...
Try 'celery --help' for help.
Error: Invalid value for '-A' / '--app':
Unable to load celery application.
Module 'select' has no attribute 'epoll'
我错过了什么?
gevent 的第一件事是执行猴子补丁以使标准库协作,以便它们表现为协程。
当您使用 gevent 池时,gevent 模块会将自身添加到 sys.path 标准库之前,并替换模块列表中已加载的模块。这使得您无需使用特定于 gevent 的模块即可运行 gevent。
我怀疑你使用urllib3?在这种情况下,urllib3 选择 gevent 的 select.py 而不是标准库选择模块,这解释了您所看到的错误。
有可能gevent没有尽早进行monkey patch或者与你使用的包版本有关