Celery Worker在docker容器中运行时未选择任务

问题描述 投票:0回答:1

我面临这个问题,当我在docker容器中运行我的芹菜工人时,它没有选择任务。

我正在使用Flask和芹菜。

这是我在没有docker的情况下运行时的日志

[email protected] v4.4.2 (cliffs)

Darwin-18.2.0-x86_64-i386-64bit 2020-05-26 22:16:40

[config]
.> app:         __main__:0x111343470
.> transport:   redis://localhost:6379//
.> results:     redis://localhost:6379/
.> concurrency: 8 (prefork)
.> task events: ON

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . load_data.scraping.tasks.scrape_the_data_daily
  . scrape the data daily

您可以清楚地看到我的工人正在寻找任务,但它没有执行定期任务。

当我在docker中运行相同的命令时,这就是我得到的:

celery-worker_1  | /usr/local/lib/python3.6/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
celery-worker_1  | absolutely not recommended!
celery-worker_1  | 
celery-worker_1  | Please specify a different user using the --uid option.
celery-worker_1  | 
celery-worker_1  | User information: uid=0 euid=0 gid=0 egid=0
celery-worker_1  | 
celery-worker_1  |   uid=uid, euid=euid, gid=gid, egid=egid,
celery-worker_1  | [2020-05-26 18:54:02,088: DEBUG/MainProcess] | Worker: Preparing bootsteps.
celery-worker_1  | [2020-05-26 18:54:02,090: DEBUG/MainProcess] | Worker: Building graph...
celery-worker_1  | [2020-05-26 18:54:02,092: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, StateDB, Beat, Consumer}

因此看起来好像找不到应用程序和任务。

但是如果我从docker容器执行命令,则可以看到找到了我的任务。

这是我如何设置docker-compose

web:
    image: apis
    build: .
    command: uwsgi --http 0.0.0.0:5000 --module apis.wsgi:app
    env_file:
      - ./.env
    environment:
      - POSTGRES_HOST=db
      - CELERY_BROKER_URL=redis://redis:6379
      - CELERY_RESULT_BACKEND_URL=redis://redis:6379
    volumes:
      - ./apis:/code/apis
      - ./tests:/code/tests
      - ./load_data:/code/load_data
      - ./db/:/db/
    ports:
      - "5000:5000"
    links: 
      - redis
  redis:
    image: redis
  celery-beat:
    image: apis
    command: "celery -A apis.celery_app:app beat -S celerybeatredis.schedulers.RedisScheduler --loglevel=info"
    env_file:
      - ./.env
    depends_on:
      - redis
    links: 
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379
      - CELERY_RESULT_BACKEND_URL=redis://redis:6379
      - CELERY_REDIS_SCHEDULER_URL=redis://redis:6379
      - C_FORCE_ROOT=true
    volumes:
      - ./apis:/code/apis
      - ./tests:/code/tests
      - ./load_data:/code/load_data
      - ./db/:/db/
    shm_size: '64m'
  celery-worker:
    image: apis
    command: "celery worker -A apis.celery_app:app --loglevel=debug -E"
    env_file:
      - ./.env
    depends_on:
      - redis
      - celery-beat
    links: 
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379
      - CELERY_RESULT_BACKEND_URL=redis://redis:6379
      - CELERY_REDIS_SCHEDULER_URL=redis://redis:6379
      - C_FORCE_ROOT=true
    volumes:
      - ./apis:/code/apis
      - ./tests:/code/tests
      - ./load_data:/code/load_data
      - ./db/:/db/
    shm_size: '64m'

芹菜的设置就像这样...

from apis.app import init_celery
from celery.schedules import crontab
from apis.config import CELERY_REDIS_SCHEDULER_KEY_PREFIX, CELERY_REDIS_SCHEDULER_URL
from celery.task.control import inspect

app = init_celery()
app.conf.imports = app.conf.imports + ("load_data.scraping.tasks",)
app.conf.imports = app.conf.imports + ("apis.models.address", )

app.conf.beat_schedule = {
    'get-data-every-day': {
        'task': 'load_data.scraping.tasks.scrape_the_data_daily',
        'schedule': crontab(minute='*/5'),
    },
}
app.conf.timezone = 'UTC'
app.conf.CELERY_REDIS_SCHEDULER_URL = CELERY_REDIS_SCHEDULER_URL
app.conf.CELERY_REDIS_SCHEDULER_KEY_PREFIX = CELERY_REDIS_SCHEDULER_KEY_PREFIX

i = inspect()
print(10*"===", i.registered_tasks())

然后像这样初始化芹菜

def init_celery(app=None):
    app = app or create_app()
    celery.conf.broker_url = app.config["CELERY_BROKER_URL"]
    celery.conf.result_backend = app.config["CELERY_RESULT_BACKEND"]
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        """Make celery tasks work with Flask app context"""

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

基本上我有两个问题。

  1. 第一个原因是为什么我在docker容器中运行时没有得到任务?
  2. 第二个为什么我的任务没有运行?

欢迎提出任何想法。

python docker flask celery
1个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.