我想创建两个芹菜队列(用于不同类型的任务)
我的芹菜配置。我希望这个配置创建两个队列
'celery' and 'celery:1'
# celery.py
import os
from celery import Celery
from core_app.settings import INSTALLED_APPS
# this code copied from manage.py
# set the default Django settings module for the 'celery' app.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core_app.settings')
app = Celery("my_app")
# To start scheduling tasks based on priorities
# you need to configure queue_order_strategy transport option.
# ['celery', 'celery:1',] - two queues, the highest priority queue will be named celery
app.conf.broker_transport_options = {
'priority_steps': list(range(2)),
'sep': ':',
'queue_order_strategy': 'priority',
}
# read config from Django settings, the CELERY namespace would make celery
# config keys has `CELERY` prefix
app.config_from_object('django.conf:settings', namespace='CELERY')
# load tasks.py in django apps
app.autodiscover_tasks(lambda: INSTALLED_APPS)
我正在定义并运行这样的 celery 任务:
@shared_task(queue="celery", soft_time_limit=600, time_limit=650)
def check_priority_urls(parsing_result_ids: List[int]):
check_urls(parsing_result_ids)
@shared_task(queue="celery:1", soft_time_limit=600, time_limit=650)
def check_common_urls(parsing_result_ids: List[int]):
check_urls(parsing_result_ids)
# running task
check_priority_urls.delay(parsing_results_ids)
但我只看到一个队列
celery -A core_app inspect active_queues
-> celery@d1a287d1d3b1: OK
* {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None}
1 node online.
我的 docker-compose 中的芹菜
celery:
build: ./project
command: celery -A core_app worker --loglevel=info --concurrency=15 --max-memory-per-child=1000000
volumes:
- ./project:/usr/src/app
- ./project/media:/project/media
- ./project/logs:/project/logs
env_file:
- .env
environment:
# environment variables declared in the environment section override env_file
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- django
- redis
你的情况意味着你的工作人员实际上并没有在第二个队列上监听,
默认情况下,Celery Worker 将仅侦听“默认”队列,除非您明确配置或指示它这样做
你可以做这样的事情
from celery import Celery
from kombu import Queue, Exchange
app = Celery("my_app")
# Declare your queues explicitly
app.conf.task_queues = (
Queue('celery', Exchange('celery'), routing_key='celery'),
Queue('celery:1', Exchange('celery'), routing_key='celery:1'),
)
# (Optional) If you want to direct certain tasks to certain queues,
# you can specify task_routes.
app.conf.task_routes = {
'path.to.check_priority_urls': {'queue': 'celery'},
'path.to.check_common_urls': {'queue': 'celery:1'},
}
# You can keep the rest of your config as is
您还需要确保您的工作人员监听两个队列
celery -A core_app worker --loglevel=info --concurrency=15 \
--max-memory-per-child=1000000 \
-Q celery,celery:1
在你的 docker 文件中你需要添加这个comamand 确保在命令中添加队列
command: >
celery -A core_app worker
--loglevel=info
--concurrency=15
--max-memory-per-child=1000000
-Q celery,celery:1