使用 asyncio 安排周期性 RQ 任务

问题描述 投票:0回答:1

我认为 RQ Scheduler 的架构从根本上来说是有缺陷的,而且它比需要的复杂得多。

时间表存储在Redis中
即使您删除或修改调度代码,旧调度仍保留在 Redis 中,直到明确取消为止。

非声明式模型
仅删除

scheduler.cron(...)
行并不会删除计划;您必须手动运行
scheduler.cancel(job_id)

rqscheduler
无法运行您的代码
rqscheduler
命令仅轮询 Redis 来查找到期作业。它不会导入或执行您的 Python 代码来动态更新时间表。

没有内置对账
您必须自行处理计划作业的生命周期(添加、更新或删除它们),通常需要额外的脚本或手动流程。

我非常喜欢 Celery Beat 的调度/排队任务方法,从我的 python 代码中,恰好在计划运行的时间。

但是,在这个特定的项目中,我使用的是 RQ,因此我无法使用 Celery Beat。

我怎样才能创建一些简单的东西,它采用 cron 字符串并以与 Celery Beat 类似的方式工作?

python cron python-asyncio scheduler rq
1个回答
0
投票
# scheduler.py

from __future__ import annotations

import asyncio
from collections.abc import Callable
from datetime import datetime
from typing import Any, TypedDict, Unpack

from croniter import croniter
import logfire
import rq


queue = rq.Queue(connection=...)
_schedulers: list[Callable] = []


def run_rq_scheduler():
    """
    This should only be called once, by a single instance, of a single service.
    """
    for s in _schedulers:
        s()


class ScheduleRqJobKwargs(TypedDict, total=False):
    """
    See description of `rq.Queue.enqueue_call` for detail or documentation:
    https://python-rq.org/docs/#enqueueing-jobs
    """

    timeout: int | None
    result_ttl: int | None
    ttl: int | None
    failure_ttl: int | None
    description: str | None
    job_id: str | None
    at_front: bool
    meta: dict | None
    retry: rq.Retry | None
    on_success: rq.Callback | Callable[..., Any] | None
    on_failure: rq.Callback | Callable[..., Any] | None
    on_stopped: rq.Callback | Callable[..., Any] | None


def schedule_rq_job(cron: str, **kwargs: Unpack[ScheduleRqJobKwargs]):
    def inner(fn: Callable):
        def scheduler():
            fn_name = f'{fn.__module__}.{fn.__qualname__}'
            logfire.info(f'Scheduling RQ Job: {cron} | {fn_name}')

            async def coro():
                while True:
                    now = datetime.now()
                    next_: datetime = croniter(cron).get_next(datetime, now, False)
                    delta = next_ - now
                    await asyncio.sleep(delta.total_seconds())

                    logfire.info(f'Enqueuing RQ Job: {cron} | {fn_name}')
                    queue.enqueue(fn_name, **kwargs)

            asyncio.create_task(coro())  # noqa: RUF006

        _schedulers.append(scheduler)

        return fn

    return inner


# -----------------------------------------------
# All jobs must be scheduled here, in this file.
# -----------------------------------------------


# Examples: 
@schedule_rq_job('0 4 * * *')
async def full_sync():
    ...


@schedule_rq_job('@hourly')
async def process_restock_subscriptions():
    ...

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    run_rq_scheduler()
    loop.run_forever()
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.