时间表存储在Redis中
即使您删除或修改调度代码,旧调度仍保留在 Redis 中,直到明确取消为止。
非声明式模型
仅删除
scheduler.cron(...)
行并不会删除计划;您必须手动运行scheduler.cancel(job_id)
。
rqscheduler
无法运行您的代码rqscheduler
命令仅轮询 Redis 来查找到期作业。它不会导入或执行您的 Python 代码来动态更新时间表。
没有内置对账
您必须自行处理计划作业的生命周期(添加、更新或删除它们),通常需要额外的脚本或手动流程。
但是,在这个特定的项目中,我使用的是 RQ,因此我无法使用 Celery Beat。
我怎样才能创建一些简单的东西,它采用 cron 字符串并以与 Celery Beat 类似的方式工作?
# scheduler.py
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import datetime
from typing import Any, TypedDict, Unpack
from croniter import croniter
import logfire
import rq
queue = rq.Queue(connection=...)
_schedulers: list[Callable] = []
def run_rq_scheduler():
"""
This should only be called once, by a single instance, of a single service.
"""
for s in _schedulers:
s()
class ScheduleRqJobKwargs(TypedDict, total=False):
"""
See description of `rq.Queue.enqueue_call` for detail or documentation:
https://python-rq.org/docs/#enqueueing-jobs
"""
timeout: int | None
result_ttl: int | None
ttl: int | None
failure_ttl: int | None
description: str | None
job_id: str | None
at_front: bool
meta: dict | None
retry: rq.Retry | None
on_success: rq.Callback | Callable[..., Any] | None
on_failure: rq.Callback | Callable[..., Any] | None
on_stopped: rq.Callback | Callable[..., Any] | None
def schedule_rq_job(cron: str, **kwargs: Unpack[ScheduleRqJobKwargs]):
def inner(fn: Callable):
def scheduler():
fn_name = f'{fn.__module__}.{fn.__qualname__}'
logfire.info(f'Scheduling RQ Job: {cron} | {fn_name}')
async def coro():
while True:
now = datetime.now()
next_: datetime = croniter(cron).get_next(datetime, now, False)
delta = next_ - now
await asyncio.sleep(delta.total_seconds())
logfire.info(f'Enqueuing RQ Job: {cron} | {fn_name}')
queue.enqueue(fn_name, **kwargs)
asyncio.create_task(coro()) # noqa: RUF006
_schedulers.append(scheduler)
return fn
return inner
# -----------------------------------------------
# All jobs must be scheduled here, in this file.
# -----------------------------------------------
# Examples:
@schedule_rq_job('0 4 * * *')
async def full_sync():
...
@schedule_rq_job('@hourly')
async def process_restock_subscriptions():
...
if __name__ == '__main__':
loop = asyncio.new_event_loop()
run_rq_scheduler()
loop.run_forever()