我正在运行一个烧瓶应用程序,该应用程序可以接收偶发的但繁重的工作。我实现了一系列docker容器,它们运行flask应用程序,celery,redis(作为代理)和memcached(作为后端)。我使用芹菜将加工重量分叉,然后使用get()检索所有结果:
# Multithreading
jobs = group(processing_fn.s(c) for c in chunks)
result = jobs.apply_async()
while not result.ready() :
time.sleep(30)
resultset = result.get()
虽然此方法可以快速而正常地工作,但我在订购芹菜后在使用它们以释放用于存储任务结果的RAM时遇到了麻烦。 最终,服务器内存不足,必须重新启动,这远非最佳。
我尝试在结果集(甚至结果集中的每个结果)上使用.forget
:
result = result.get()
result.forget()
...
resultset = result.get()
for r in result :
r.forget()
但是,这些都没有释放记忆...有什么想法吗?
这是celery应用的实例化方式:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker = "redis://redis:6379/",
backend = "cache+memcached://memcached:11211"
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
...
celery = make_celery(app)
默认情况下,芹菜将任何任务的结果存储1天:
请参阅:您可以通过以下方式对其进行调整:CELERY_TASK_RESULT_EXPIRES或https://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires