在 django docker 容器中运行 celery 应用程序

问题描述 投票:0回答:2

我有一个使用 docker 和 docker-compose 的 Django 项目,我想在 Django 项目中使用 Celery。 我按照 Celery 文档创建了一个应用程序、一个任务并将其与 RabbitMQ 代理连接起来。

这是我的 celery.py 文件,与 settings.py 文件位于同一目录中:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")

backtest_app = Celery("backtest_app")

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
backtest_app.config_from_object("django.conf:settings", namespace="CELERY")

# Load task modules from all registered Django apps.
backtest_app.autodiscover_tasks()

我还在settings.py 文件中设置了 Celery 应用程序配置:

# CELERY config
CELERY_BROKER_URL = "amqp://rabbitmq:5672"
CELERY_BROKER_USER = "user"
CELERY_BROKER_PASSWORD = "pass"
CELERY_BACKEND = "rpc://"
CELERY_BROKER_POOL_LIMIT = 100
CELERY_TASK_DEFAULT_QUEUE = "backtest_queue"
CELERY_TASK_DEFAULT_EXCHANGE = "backtest_exchange"
CELERY_TASK_DEFAULT_ROUTING_KEY = "backtest_queue"
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "django-cache"

现在,我在后端的 Dockerfile 中有这段代码:

FROM python:3.10.8-alpine

# Prevents Python from writing pyc files to disc:
ENV PYTHONDONTWRITEBYTECODE 1
# Prevents Python from buffering stdout and stderr
ENV PYTHONUNBUFFERED 1

# Instal pip
# RUN apk update && apk upgrade && \
#     apk add py-pip
RUN apk update && apk upgrade \
    && apk add gcc python3-dev musl-dev build-base gcc wget
# install dependencies
RUN pip install --upgrade pip
# TA-Lib
RUN wget http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz && \
  tar -xvzf ta-lib-0.4.0-src.tar.gz && \
  cd ta-lib/ && \
  ./configure --prefix=/usr && \
  make && \
  make install

RUN pip install TA-Lib==0.4.25
RUN rm -R ta-lib ta-lib-0.4.0-src.tar.gz

RUN mkdir /worker
WORKDIR /worker
COPY . .

# Create a directory for logs
RUN mkdir -p /app/logs
# # Installation der erforderlichen Python-Pakete
# RUN pip install --upgrade pip

RUN pip install -r ./requirements.txt

我也将此代码作为 docker-compose 文件中的后端服务:

webapp_backend:
    build: ./webapp_backend
    command: sh -c "python manage.py migrate --noinput && gunicorn proj.wsgi --bind 0.0.0.0:80"
    volumes:
      - backend_data:/webapp_backend/media/
    env_file:
      - webapp_backend/.env
    depends_on:
      - db
      - rabbitmq

现在,我想在后端容器中运行 Celery 应用程序,以便它可以捕获任务。但将其放入 webapp_backend 服务中的“命令”中,不会启动工作程序,因为终端已经忙于运行

gunicorn proj.wsgi --bind 0.0.0.0:80
命令。

我能想到的一种方法(但我无法实现)是在 webapp_backend 容器中创建两个单独的终端,两个终端在那里运行

celery -A proj worker --loglevel=info
命令。如果这个解决方案有效,请帮助我实现这一目标。但如果这不是有效的解决方案,请提出一个解决方案。

另外我需要提的是,ChatGPT 给了我一种方法,虽然它有效,但不是一个好的方法。 它建议有另一个服务,名为“celery_worker”,如下所示:

  # celery_worker:
  #   build: ./webapp_backend
  #   command: celery -A proj worker --loglevel=info
  #   volumes:
  #     - backend_data:/webapp_backend/media/
  #   env_file:
  #     - webapp_backend/.env
  #   depends_on:
  #     - webapp_backend

但是正如你所看到的,这会构建另一个与 webapp_backend 完全相同的容器,并将项目的所有代码复制到这个新容器中,只是为了启动工作程序,如果我理解正确并且它实际上复制了整个容器,那么它这样做根本不是一个好方法。虽然它有效并且任务已完成,但我正在寻找另一种方法。

django linux docker rabbitmq celery
2个回答
1
投票

Celery 程序自行管理进程,并且通过单独的命令更容易设置。通常,只有一个程序在 docker 容器内运行,该容器管理所有 UNIX 进程,以便在扩展期间提供更好的信号处理机制。例如,预分叉模式下的gunicorn会生成子进程,它通过主进程(PID 1)控制子进程。

Celery 应在单独的进程中运行,最好在单独的容器中运行,以实现信号处理机制,以避免 ChatGPT 建议的意外问题。但是您可以构建您的应用程序,这样不必要的繁重库就不会导入到 django/celery 应用程序中,并使用 signatures 将函数作为字符串传递。

您还可以在单个容器中运行多个进程,其中一个进程采用 PID 1,而其他进程则在 background 中运行,以节省资源,尽管这不是推荐的方法。这些库带有内置的 POSIX 信号处理机制,将这些信号发送到这些应用程序非常重要。

或者,您可以查看 apscheduler 是否满足您的需求。它使用您的操作系统而不是外部代理来管理通信,并且无法单独自动扩展。


0
投票

对于任何对此感到非常疯狂的人:

`kombu.exceptions.OperationalError: [Errno 111] Connection refused` 

即使您确保将所有必需的代码添加到主项目

celery.py
__iniy__.py
settings.py
以及应用程序文件夹中的
tasks.py
文件。

我已经尝试了近35个小时来测试用shared_task修饰的函数,并且总是收到此错误。但问题是我正在以两种方式启动的 shell 中尝试它:

  1. python
  2. django-admin shell

两者都会因某种我不太清楚的原因而失败。

你必须确保你的

function.delay()
是从典型的 django 执行中调用的(比如从视图中),或者你需要启动一个 python shell 来通过
python manage.py shell

手动测试你的函数调用
© www.soinside.com 2019 - 2024. All rights reserved.