我有一个使用 docker 和 docker-compose 的 Django 项目,我想在 Django 项目中使用 Celery。 我按照 Celery 文档创建了一个应用程序、一个任务并将其与 RabbitMQ 代理连接起来。
这是我的 celery.py 文件,与 settings.py 文件位于同一目录中:
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")
backtest_app = Celery("backtest_app")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
backtest_app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
backtest_app.autodiscover_tasks()
我还在settings.py 文件中设置了 Celery 应用程序配置:
# CELERY config
CELERY_BROKER_URL = "amqp://rabbitmq:5672"
CELERY_BROKER_USER = "user"
CELERY_BROKER_PASSWORD = "pass"
CELERY_BACKEND = "rpc://"
CELERY_BROKER_POOL_LIMIT = 100
CELERY_TASK_DEFAULT_QUEUE = "backtest_queue"
CELERY_TASK_DEFAULT_EXCHANGE = "backtest_exchange"
CELERY_TASK_DEFAULT_ROUTING_KEY = "backtest_queue"
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "django-cache"
现在,我在后端的 Dockerfile 中有这段代码:
FROM python:3.10.8-alpine
# Prevents Python from writing pyc files to disc:
ENV PYTHONDONTWRITEBYTECODE 1
# Prevents Python from buffering stdout and stderr
ENV PYTHONUNBUFFERED 1
# Instal pip
# RUN apk update && apk upgrade && \
# apk add py-pip
RUN apk update && apk upgrade \
&& apk add gcc python3-dev musl-dev build-base gcc wget
# install dependencies
RUN pip install --upgrade pip
# TA-Lib
RUN wget http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz && \
tar -xvzf ta-lib-0.4.0-src.tar.gz && \
cd ta-lib/ && \
./configure --prefix=/usr && \
make && \
make install
RUN pip install TA-Lib==0.4.25
RUN rm -R ta-lib ta-lib-0.4.0-src.tar.gz
RUN mkdir /worker
WORKDIR /worker
COPY . .
# Create a directory for logs
RUN mkdir -p /app/logs
# # Installation der erforderlichen Python-Pakete
# RUN pip install --upgrade pip
RUN pip install -r ./requirements.txt
我也将此代码作为 docker-compose 文件中的后端服务:
webapp_backend:
build: ./webapp_backend
command: sh -c "python manage.py migrate --noinput && gunicorn proj.wsgi --bind 0.0.0.0:80"
volumes:
- backend_data:/webapp_backend/media/
env_file:
- webapp_backend/.env
depends_on:
- db
- rabbitmq
现在,我想在后端容器中运行 Celery 应用程序,以便它可以捕获任务。但将其放入 webapp_backend 服务中的“命令”中,不会启动工作程序,因为终端已经忙于运行
gunicorn proj.wsgi --bind 0.0.0.0:80
命令。
我能想到的一种方法(但我无法实现)是在 webapp_backend 容器中创建两个单独的终端,两个终端在那里运行
celery -A proj worker --loglevel=info
命令。如果这个解决方案有效,请帮助我实现这一目标。但如果这不是有效的解决方案,请提出一个解决方案。
另外我需要提的是,ChatGPT 给了我一种方法,虽然它有效,但不是一个好的方法。 它建议有另一个服务,名为“celery_worker”,如下所示:
# celery_worker:
# build: ./webapp_backend
# command: celery -A proj worker --loglevel=info
# volumes:
# - backend_data:/webapp_backend/media/
# env_file:
# - webapp_backend/.env
# depends_on:
# - webapp_backend
但是正如你所看到的,这会构建另一个与 webapp_backend 完全相同的容器,并将项目的所有代码复制到这个新容器中,只是为了启动工作程序,如果我理解正确并且它实际上复制了整个容器,那么它这样做根本不是一个好方法。虽然它有效并且任务已完成,但我正在寻找另一种方法。
Celery 程序自行管理进程,并且通过单独的命令更容易设置。通常,只有一个程序在 docker 容器内运行,该容器管理所有 UNIX 进程,以便在扩展期间提供更好的信号处理机制。例如,预分叉模式下的gunicorn会生成子进程,它通过主进程(PID 1)控制子进程。
Celery 应在单独的进程中运行,最好在单独的容器中运行,以实现信号处理机制,以避免 ChatGPT 建议的意外问题。但是您可以构建您的应用程序,这样不必要的繁重库就不会导入到 django/celery 应用程序中,并使用 signatures 将函数作为字符串传递。
您还可以在单个容器中运行多个进程,其中一个进程采用 PID 1,而其他进程则在 background 中运行,以节省资源,尽管这不是推荐的方法。这些库带有内置的 POSIX 信号处理机制,将这些信号发送到这些应用程序非常重要。
或者,您可以查看 apscheduler 是否满足您的需求。它使用您的操作系统而不是外部代理来管理通信,并且无法单独自动扩展。
对于任何对此感到非常疯狂的人:
`kombu.exceptions.OperationalError: [Errno 111] Connection refused`
即使您确保将所有必需的代码添加到主项目
celery.py
、__iniy__.py
和settings.py
以及应用程序文件夹中的tasks.py
文件。
我已经尝试了近35个小时来测试用shared_task修饰的函数,并且总是收到此错误。但问题是我正在以两种方式启动的 shell 中尝试它:
python
django-admin shell
两者都会因某种我不太清楚的原因而失败。
你必须确保你的
function.delay()
是从典型的 django 执行中调用的(比如从视图中),或者你需要启动一个 python shell 来通过 python manage.py shell
手动测试你的函数调用