我尝试安排 2 份工作:
scheduler.add_job(send_delayed_message, 'date', run_date=datetime.now() + timedelta(minutes=1), kwargs={'id': order['ID'], 'bot': bot}, id=f'{order["ID"]}_first')
scheduler.add_job(send_delayed_message, 'date', run_date=datetime.now() + timedelta(minutes=3), kwargs={'id': order['ID'], 'bot': bot}, id=f'{order["ID"]}_second')
这是功能:
async def send_dealayed_message(id, bot):
bot.send_message(chat_id=id, text='Seems you didn"t come to us so far, so it"s time to do it!')
这是完整的调度程序代码:
scheduler = AsyncIOScheduler(timezone='Europe/Moscow')
scheduler.add_jobstore(SQLAlchemyJobStore(url='sqlite:///ProjectDB.sqlite'))
调度程序在电报机器人启动时启动,并且数据库在调度程序启动之前创建
这是完整的错误代码:
But i get this:
Task exception was never retrieved
future: <Task finished name='Task-47' coro=<Dispatcher._process_polling_updates() done, defined at C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\aiogram\dispatcher\dispatcher.py:407> exception=TypeError("cannot pickle '_asyncio.Task' object")>
Traceback (most recent call last):
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\aiogram\dispatcher\dispatcher.py", line 415, in _process_polling_updates
for responses in itertools.chain.from_iterable(await self.process_updates(updates, fast)):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\aiogram\dispatcher\dispatcher.py", line 235, in process_updates
return await asyncio.gather(*tasks)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\aiogram\dispatcher\handler.py", line 117, in notify
response = await handler_obj.handler(*args, **partial_data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\aiogram\dispatcher\dispatcher.py", line 256, in process_update
return await self.message_handlers.notify(update.message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\aiogram\dispatcher\handler.py", line 117, in notify
response = await handler_obj.handler(*args, **partial_data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\user\projects\TelegramBot\main.py", line 199, in get_confirm
await create_order(order, bot=bot)
File "c:\Users\user\projects\TelegramBot\DataBase.py", line 97, in create_order
scheduler.add_job(send_dealayed_message, 'date', run_date=datetime.now() + timedelta(minutes=1), kwargs={'id': order['ID'], 'bot': bot}, id=f'{order["ID"]}_first')
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\apscheduler\schedulers\base.py", line 447, in add_job
self._real_add_job(job, jobstore, replace_existing)
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\apscheduler\schedulers\base.py", line 871, in _real_add_job
store.add_job(job)
File "C:\Users\user\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\apscheduler\jobstores\sqlalchemy.py", line 97, in add_job
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_asyncio.Task' object
我也有同样的问题。这是我的最终解决方案。
from asyncio.coroutines import _is_coroutine
from typing import Optional
from aiogram import Bot
from aiogram.methods import TelegramMethod
class TelegramMethodJob:
_is_coroutine = _is_coroutine
_bot = None
def __init__(self, method: TelegramMethod):
self.method = method
self.method._bot = None
async def call(self):
self.method._bot = TelegramMethodJob._bot
return await self.method
@classmethod
def set_bot(cls, bot: Optional[Bot]):
cls._bot = bot
清除
_bot
成员,调用时重新设置。
用途:
bot = Bot(...)
TelegramMethodJob.set_bot(bot)
scheduler = MyScheduler()
scheduler.add_job(TelegramMethodJob(bot.send_message(...)))