我一直在努力进行 redbeat 设置并使用我的 Flask 应用程序,但我似乎无法让它正确启动。当我运行命令时
celery -A celery_config.celery beat --scheduler redbeat.RedBeatScheduler
我从这里得到这个错误,这使得conf中的redis_url看起来像是None
File "/Users/test/Documents/Projects/Project/.venv/lib/python3.9/site-packages/redbeat/schedulers.py", line 401, in setup_schedule
client = get_redis(self.app)
File "/Users/test/Documents/Projects/Project/.venv/lib/python3.9/site-packages/redbeat/schedulers.py", line 124, in get_redis
elif conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options:
AttributeError: 'NoneType' object has no attribute 'startswith'
我相信我的设置一定是错误的,才会发生此错误,但我不确定在哪里/如何发生。非常感谢任何指导/帮助来让这个东西启动并运行。这是一些精简的相关代码,说明我如何设置它以及如何尝试使用它。
application.py 文件,我在其中创建 Flask 应用程序并将 application.celery_app 绑定到我在 celery_config 文件中创建的 celery 实例
from flask import Flask, redirect, url_for
from extensions import ma, jwt, db, socketio
import config.const as CONSTANTS
from web.authentication import auth
from web.tasks import tasks
from flask_cors import CORS
from celery_config import init_celery
def create_app():
application = Flask(__name__)
configure_app(application)
application.celery_app = init_celery(application)
return application
def configure_app(application):
application.config['SQLALCHEMY_DATABASE_URI'] = CONSTANTS.DB_CONNECTION_STRING
application.config['SECRET_KEY'] = CONSTANTS.SECRET
application.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
application.config['PREFERRED_URL_SCHEME'] = 'https'
application = create_app()
if __name__ == '__main__':
application.run(debug=True, use_reloader=False, port=5009)
这里是 celery_config.py,其中创建并配置了 celery。我在终端窗口中本地运行 redis。
from celery import Celery
celery = None
def make_celery(app):
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0',
REDBEAT_REDIS_URL='redis://localhost:6379/1'
)
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update({
'beat_scheduler': 'redbeat.RedBeatScheduler',
'beat_schedule': {},
'redbeat_key_prefix': 'redbeat:',
'redbeat_redis_url': app.config['REDBEAT_REDIS_URL']
})
return celery
def init_celery(app):
global celery
celery = make_celery(app)
return celery
最后,这是我要添加、编辑和删除任务的文件。我猜错误来自这里,我使用 RedBeatSchedulerEntry() 创建新条目
from celery.schedules import crontab
from flask import Blueprint, jsonify, request
from models.task import Task
from services.login_decorator import duo_login_required
import services.helpers as helpers
from datetime import datetime, timedelta
from redbeat import RedBeatSchedulerEntry
from celery_config import celery as celery_app
tasks = Blueprint('tasks', __name__)
@tasks.route('/create_task', methods=['POST'])
def create_new_task():
try:
body = request.json
task = Task()
if body['interval'] == True:
task.name = body['name']
task.type = body['type']
task.url = body['url']
task.interval = True
task.interval_time = body['time']
task.update()
key = task.id
args = [task.id, task.url]
schedule = timedelta(minutes=int(task.interval_time))
entry = RedBeatSchedulerEntry(key, 'myapp.tasks.task_function', schedule, args=args, kwargs={},
app=celery_app)
entry.save()
return jsonify({'id': task.id, 'name': task.name, 'type': task.type, 'interval_time': body['time'],'interval': True, 'url': task.url}), 200
当未在
None
中传递适当的应用程序时,通常会发生 app=
错误,在本例中为 RedBeatSchedulerEntry
。
尝试改变
from celery_config import celery as celery_app
到
from celery_config import make_celery as celery_app