我有一个运行没有问题的应用程序,我最近从 APScheduler 切换到 Celery。 我已经为其设置了 redis 服务器,并且 celery 任务似乎从外部运行顺利。 我已经启动了芹菜工人并在不同的终端上击败。 每当执行任务时,我的应用程序都应该给出输出。但没有输出
我的 app.py 的代码:
#I have imported everything required
#creating the app
app = Flask(__name__, static_folder='static')
#configuring the celery app
app.config['broker_url'] = 'redis://localhost:6379/0'
app.config['result_backend'] = 'redis://localhost:6379/0'
#creating the celery app
celery = Celery(app.name, broker=app.config['broker_url'])
celery.conf.update(app.config)
app.secret_key = '9557186409'
#configuring the database
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
db.init_app(app)
webhook_url = "https://chat.googleapis.com/v1/spaces/AAAA0XTp2i8/messages?key=AIzaSyDdI0hCZtE6vySjMm-WEfRq3CPzqKqqsHI&token=4sX2rjnS34sBcb6Q7tKN800_IHQbcHCB76lW_0rbxqw"
def send_message_to_chat(message):
data = {"text": message}
response = requests.post(webhook_url, json=data)
print(f"Message sent to chat: {response.status_code}")
with app.app_context():
#creating the databases
db.create_all()
index_routes(app)
librarian_routes(app)
student_routes(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login_session_not_found'
@login_manager.user_loader
def load_user(user_id):
return users_db.query.get(user_id)
@celery.task
def test_celery():
print("Celery task executed at ", datetime.now())
@celery.task
def check_expiry():
try:
print("Executing check_expiry task at ", datetime.now())
issued = requests_db.query.filter_by(status='issued').all()
for issue in issued:
if issue.end_date and issue.end_date < datetime.now():
print(f"Issue {issue.id} is expired.")
issue.status = "completed"
db.session.commit()
print("check_expiry task completed.")
except Exception as e:
print(f"Error in check_expiry task: {e}")
@celery.task
def send_reminder():
with app.app_context():
print("Sending reminders ", datetime.now())
send_message_to_chat("Sending reminders")
current_time = datetime.now()
for user in users_db.query.all():
#check if current time is the preferred time of the user, if last reminder was not sent in the current date
if user.preferred_hour == current_time.hour:
if user.last_reminder_datetime == None or user.last_reminder_datetime.date() != current_time.date():
send_message_to_chat(f"Reminder for {user.username}")
user.last_reminder_datetime = current_time
db.session.commit()
celery.conf.beat_schedule = {
"check_expiry": {
"task": "app.check_expiry",
#runs every minute
"schedule": crontab(minute="*")
},
"send_reminder": {
"task": "app.send_reminder",
#runs every hour
"schedule": crontab(minute="*")
},
"test_celery": {
"task": "app.test_celery",
#runs every minute
"schedule": crontab(minute="*")
}
}
#starting the app
if __name__ == '__main__':
app.run()
我正在使用 venv 来运行这个应用程序。我尝试过运行worker并在venv内部和外部进行击败。
Worker初始化:
celery -A app.celery worker --loglevel=info
节拍初始化:
celery -A app.celery beat --loglevel=info
工人终端:
[2024-07-31 16:38:00,004: INFO/MainProcess] Task app.test_celery[1f068e64-4cb4-43ad-b1d3-f7cd91bd0a5f] received
[2024-07-31 16:38:00,007: INFO/MainProcess] Task app.send_reminder[9520a809-4833-43b3-9964-520d13f26fa2] received
[2024-07-31 16:38:00,008: INFO/MainProcess] Task app.check_expiry[26f91af5-c07e-4acd-b816-a36cc299a947] received
击败终端:
[2024-07-31 16:38:00,000: INFO/MainProcess] Scheduler: Sending due task test_celery (app.test_celery)
[2024-07-31 16:38:00,001: INFO/MainProcess] Scheduler: Sending due task send_reminder (app.send_reminder)
[2024-07-31 16:38:00,004: INFO/MainProcess] Scheduler: Sending due task check_expiry (app.check_expiry)
所有这些都没有Python输出
我尝试删除 debug=true 选项,我尝试创建一个简单的 celery 任务来检查内容,我尝试重新启动应用程序、worker 和beat。 我也尝试向 gpt4o 寻求帮助,但仍然一无所获。 仍然没有输出。
如果您在 Windows 上运行,请使用
celery -A app.celery worker --loglevel=info -P solo
默认的 prefork 在 Windows 中不起作用。 查看这篇文章