Celery 按顺序执行任务,一个接一个

问题描述 投票:0回答:1

我有一个 Django 应用程序,该应用程序具有大量 I/O 密集型任务。

我使用 Celery 在线程中运行这些任务,并使用进度条管理 UI 中的进度。

这是我的配置:

Django 版本:5.0.2

芹菜版本:5.3.6

Redis 版本:Redis for Windows 5.0.14.1 (https://github.com/tporadowski/redis/releases)

服务器

Windows Server 2016(无法更改;我的数据存储在 Access 数据库中

在 IIS 默认应用程序池中托管应用程序

处理器:4核

内存:4GB

web.config配置:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <system.webServer>
    <handlers>
      <add name="Python FastCGI" path="*" verb="*" modules="FastCgiModule" scriptProcessor="C:\Python311\python.exe|C:\Python311\Lib\site-packages\wfastcgi.py" resourceType="Unspecified" requireAccess="Script" />
    </handlers>
        <directoryBrowse enabled="true" />
  </system.webServer>

  <appSettings>
    <add key="PYTHONPATH" value="C:\inetpub\Django-LIAL\WEBAPPLIAL" />
    <add key="WSGI_HANDLER" value="WEBAPPLIAL.wsgi.application" />
    <add key="DJANGO_SETTINGS_MODULE" value="WEBAPPLIAL.settings" />
  </appSettings>
</configuration>

Django wsgi 配置:

from gevent import monkey
monkey.patch_all()
import os

from django.core.wsgi import get_wsgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'WEBAPPLIAL.settings')

application = get_wsgi_application()

Django 芹菜配置:

#Celery setting
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_TASK_ALWAYS_EAGER = False
CELERY_TASK_TRACK_STARTED = True

在 git 中启动 celery 命令行:

$ celery -A WEBAPPLIAL worker -l info -P gevent

*** celery 命令行的作用:***

-------------- celery@WIN-RHK2AHPNGJ1 v5.3.6 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.14393-SP0 2024-05-17 12:05:49
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         WEBAPPLIAL:0x17207492650
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:
- *** --- * --- .> concurrency: 4 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . APPLICATION.A13.A13_LOG_0002.model.task.extract_data
  . APPLICATION.A13.A13_LOG_0005.tasks.launch_app
  . WEBAPPLIAL.celery.debug_task

[2024-05-17 12:05:49,995: WARNING/MainProcess] C:\Python311\Lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-05-17 12:05:50,010: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2024-05-17 12:05:50,010: WARNING/MainProcess] C:\Python311\Lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-05-17 12:05:50,026: INFO/MainProcess] mingle: searching for neighbors
[2024-05-17 12:05:51,048: INFO/MainProcess] mingle: all alone
[2024-05-17 12:05:51,048: WARNING/MainProcess] C:\Python311\Lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-05-17 12:05:51,048: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.
[2024-05-17 12:05:51,063: INFO/MainProcess] celery@WIN-RHK2AHPNGJ1 ready.

快速查看我的功能:

@shared_task(bind=True)
def launch_app(self, laiteries, formated_date):


@shared_task(bind=True)
def extract_data(self, date_start, date_end):

他们都被称为

.delay()
每个函数都与 Django ORM 交互,但在不同的模型上。


实际行为

然后,当我启动第一个功能(通过与网络应用程序交互)并立即启动第二个功能时,会发生以下情况:

[2024-05-17 12:06:28,464: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0002.model.task.extract_data[baf19fc9-dd9c-4574-af8d-c7ed9a522c0e] received
[2024-05-17 12:06:56,144: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0002.model.task.extract_data[baf19fc9-dd9c-4574-af8d-c7ed9a522c0e] succeeded in 27.60899999999998s: 'Proc▒dure termin▒e !'
[2024-05-17 12:06:56,159: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0005.tasks.launch_app[435df153-9879-47a4-93ba-5ba9ed90cf76] received
[2024-05-17 12:07:01,662: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0005.tasks.launch_app[435df153-9879-47a4-93ba-5ba9ed90cf76] succeeded in 5.5s: 'Tout les emails ont bien ▒t▒ envoyer !'

问题: 问题是 Celery 按顺序执行任务,而不是并行执行任务。

我的预期行为是这样的:

[2024-05-17 12:06:28,464: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0002.model.task.extract_data[baf19fc9-dd9c-4574-af8d-c7ed9a522c0e] received
[2024-05-17 12:06:29,159: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0005.tasks.launch_app[435df153-9879-47a4-93ba-5ba9ed90cf76] received
[2024-05-17 12:07:34,662: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0005.tasks.launch_app[435df153-9879-47a4-93ba-5ba9ed90cf76] succeeded in 5.5s: 'Tout les emails ont bien ▒t▒ envoyer  !'
[2024-05-17 12:06:56,144: INFO/MainProcess] Task APPLICATION.A13.A13_LOG_0002.model.task.extract_data[baf19fc9-dd9c-4574-af8d-c7ed9a522c0e] succeeded in 27.60899999999998s: 'Proc▒dure termin▒e !'

如果您需要更多详情,请询问!

python python-3.x django celery gevent
1个回答
0
投票

不知何故,从

gevent
切换到
threads
可以解决问题。

$ celery -A WEBAPPLIAL worker -l info -P threads 100
© www.soinside.com 2019 - 2024. All rights reserved.