我遇到一个问题,APScheduler 作业无法正确运行。他们使用父抽象类的方法,甚至有自己的实现。通过命令触发启动功能时不会发生这种情况。所有这些都在 telegram API 机器人内部。似乎只有
RedisJobStore
才会出现此错误
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import asyncio
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.redis import RedisJobStore
logging.basicConfig(filename='logfile.log', level=logging.INFO)
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
class MyAbstractClass(ABC):
@classmethod
@abstractmethod
def get_required_members(cls):
raise NotImplementedError
@classmethod
async def initiate_all(cls):
members = cls.get_required_members()
logging.info(f'Got {members=}')
...
class MyImplementation(MyAbstractClass):
@classmethod
def get_required_members(cls):
return ['Alex', 'Anna']
@classmethod
def append_to_scheduler(cls, scheduler: AsyncIOScheduler, run_date: datetime):
return scheduler.add_job(
func=cls.initiate_all,
trigger='date',
run_date=run_date
)
redis_job_store = RedisJobStore()
scheduler = AsyncIOScheduler(jobstores={'default': redis_job_store}, logger=logging.getLogger())
scheduler.start()
run_date = datetime.now() + timedelta(seconds=5)
MyImplementation.append_to_scheduler(scheduler=scheduler, run_date=run_date)
asyncio.get_event_loop().run_forever()
错误:
Job "MyAbstractClass.initiate_all (trigger: date[2023-08-04 19:01:03 MSK], next run at: 2023-08-04 19:01:03 MSK)" raised an exception
Traceback (most recent call last):
File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/venv/lib/python3.11/site-packages/apscheduler/executors/base_py3.py", line 30, in run_coroutine_job
retval = await job.func(*job.args, **job.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/mre/api.py", line 13, in initiate_all
members = cls.get_required_members()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/media/russich555/hdd/Programming/Freelance/YouDo/29.2pilot/mre/api.py", line 9, in get_required_members
raise NotImplementedError
NotImplementedError
附注我将此发布为问题
这是来自 agronholm/apscheduler@5c69150 (v3.1.0)(当前版本:v3.10.1)的回归。
它会影响所有使用
job.__getstate__()
的就业商店:
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore
您可以修补
apscheduler.util.get_callable_name
以不使用func.__qualname__
:
from apscheduler import util
from inspect import isclass
def get_callable_name(func):
# `func.__qualname__` doesn't work for inherited methods that call subclass methods
# if hasattr(func, '__qualname__'):
# return func.__qualname__
f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None)
if f_self and hasattr(func, '__name__'):
f_class = f_self if isclass(f_self) else f_self.__class__
else:
f_class = getattr(func, 'im_class', None)
if f_class and hasattr(func, '__name__'):
return '%s.%s' % (f_class.__name__, func.__name__)
if hasattr(func, '__call__'):
if hasattr(func, '__name__'):
return func.__name__
return func.__class__.__name__
raise TypeError('Unable to determine a name for %r -- maybe it is not a callable?' % func)
util.get_callable_name = get_callable_name