我正在尝试草拟自动缩放功能对具有基于复杂链的工作流的芹菜的工作方式。
当前流量:
下载(每6小时)下载30份CSV,然后创建一个芹菜组来监视各个线程,在这些线程中对每个CSV进行预处理并保存在/ tmp / folder /上
所有任务成功完成并且celery组返回True后,将/ tmp / folder /压缩并存储在S3上,并通过API调用通知其他系统。
面临的挑战:
我们有大约40至50个任务的待处理任务,这使整个过程确实很慢。
提议的解决方案:
自动缩放,即根据待处理任务的数量添加更多工作服务器。
这种方法对于我拥有的工作流程是否可能?还是有可能采用垂直缩放解决方案?解决此问题的最佳方法是什么?
@app.task
def process_csv(path_of_csv):
# preprocessing the csv and storing in /tmp/folder/
return True
res = group(process_csv.s(path) for path in all_paths)()
with allow_join_result():
print(res.get())
if 'False' not in res.get():
# Time to store to S3
环境信息
aiodns==2.0.0
aiohttp==3.5.4
amqp==2.5.0
async-timeout==3.0.1
attrs==19.1.0
Babel==2.7.0
billiard==3.5.0.5
boto3==1.9.197
botocore==1.12.197
celery==4.1.1
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
ddtrace==0.31.0
Django==2.2.3
django-enumfields==1.0.0
django-extensions==2.2.1
djangorestframework==3.10.1
docutils==0.14
flower==0.9.3
gevent==1.4.0
greenlet==0.4.15
gunicorn==19.9.0
idna==2.8
idna-ssl==1.1.0
jmespath==0.9.4
kombu==4.6.3
multidict==4.5.2
psutil==5.6.5
psycopg2-binary==2.8.3
pycares==3.0.0
pycparser==2.19
python-dateutil==2.8.0
pytz==2019.1
redis==3.3.0
requests==2.22.0
s3transfer==0.2.1
six==1.12.0
slackclient==2.0.0
sqlparse==0.3.0
tornado==5.1.1
typing==3.7.4
typing-extensions==3.7.4
urllib3==1.25.3
vine==1.3.0
websocket==0.2.1
websocket-client==0.56.0
Werkzeug==0.15.5
yarl==1.3.0
我正在使用RabbitMQ经纪人和Redis作为后端。
谢谢
如果我理解正确,则涉及4个步骤
下载CSV
处理CSV
Zip / tmp / folder
上传到云(S3)
瓶颈在步骤2。
您已经提到40-50个待处理的任务,假设您要为每个任务处理一个文件,则该文件应等于或小于30(CSV文件数)。为什么会有40-50个任务?
我不清楚。您能解释一下为什么任务要比正在处理的文件多吗?
在探索自动缩放之前,您可以探索--concurrency
,将其设置为30或更大并尝试查找是否仍然存在问题,这将有助于找到瓶颈。
https://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency
顺便说一句,又看到了一个线程,芹菜4.0中的自动缩放似乎已损坏(您是celery==4.1.1
),不确定。检查线程:https://github.com/celery/celery/issues/4003