无法创建第二个芹菜队列

问题描述 投票:0回答:1

我想创建两个芹菜队列(用于不同类型的任务)

我的芹菜配置。我希望这个配置创建两个队列

'celery' and 'celery:1' 

# celery.py

import os
from celery import Celery

from core_app.settings import INSTALLED_APPS

# this code copied from manage.py
# set the default Django settings module for the 'celery' app.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core_app.settings')

app = Celery("my_app")

# To start scheduling tasks based on priorities 
# you need to configure queue_order_strategy transport option.
# ['celery', 'celery:1',] - two queues, the highest priority queue will be named celery
app.conf.broker_transport_options = {
    'priority_steps': list(range(2)),
    'sep': ':',
    'queue_order_strategy': 'priority',
}

# read config from Django settings, the CELERY namespace would make celery 
# config keys has `CELERY` prefix
app.config_from_object('django.conf:settings', namespace='CELERY')

# load tasks.py in django apps
app.autodiscover_tasks(lambda: INSTALLED_APPS)

我正在定义并运行这样的 celery 任务:

@shared_task(queue="celery", soft_time_limit=600, time_limit=650)
def check_priority_urls(parsing_result_ids: List[int]):
    check_urls(parsing_result_ids)


@shared_task(queue="celery:1", soft_time_limit=600, time_limit=650)
def check_common_urls(parsing_result_ids: List[int]):
    check_urls(parsing_result_ids)

# running task
check_priority_urls.delay(parsing_results_ids)

但我只看到一个队列

celery -A core_app inspect active_queues
->  celery@d1a287d1d3b1: OK
    * {'name': 'celery', 'exchange': {'name': 'celery', 'type': 'direct', 'arguments': None, 'durable': True, 'passive': False, 'auto_delete': False, 'delivery_mode': None, 'no_declare': False}, 'routing_key': 'celery', 'queue_arguments': None, 'binding_arguments': None, 'consumer_arguments': None, 'durable': True, 'exclusive': False, 'auto_delete': False, 'no_ack': False, 'alias': None, 'bindings': [], 'no_declare': None, 'expires': None, 'message_ttl': None, 'max_length': None, 'max_length_bytes': None, 'max_priority': None}

1 node online.

芹菜花只看到一队 enter image description here

我的 docker-compose 中的芹菜

celery:
    build: ./project
    command: celery -A core_app worker  --loglevel=info --concurrency=15 --max-memory-per-child=1000000
    volumes:
      - ./project:/usr/src/app
      - ./project/media:/project/media
      - ./project/logs:/project/logs
    env_file:
      - .env
    environment:
    # environment variables declared in the environment section override env_file
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - django
      - redis

python django celery
1个回答
0
投票

你的情况意味着你的工作人员实际上并没有在第二个队列上监听,

默认情况下,Celery Worker 将仅侦听“默认”队列,除非您明确配置或指示它这样做

你可以做这样的事情

from celery import Celery
from kombu import Queue, Exchange

app = Celery("my_app")

# Declare your queues explicitly
app.conf.task_queues = (
    Queue('celery',    Exchange('celery'), routing_key='celery'),
    Queue('celery:1',  Exchange('celery'), routing_key='celery:1'),
)

# (Optional) If you want to direct certain tasks to certain queues,
# you can specify task_routes.
app.conf.task_routes = {
    'path.to.check_priority_urls': {'queue': 'celery'},
    'path.to.check_common_urls':   {'queue': 'celery:1'},
}

# You can keep the rest of your config as is

您还需要确保您的工作人员监听两个队列

celery -A core_app worker --loglevel=info --concurrency=15 \
       --max-memory-per-child=1000000 \
       -Q celery,celery:1

在你的 docker 文件中你需要添加这个comamand 确保在命令中添加队列

command: >
      celery -A core_app worker
      --loglevel=info
      --concurrency=15
      --max-memory-per-child=1000000
      -Q celery,celery:1 

© www.soinside.com 2019 - 2024. All rights reserved.