我正在创建2个后台调度程序,在两个调度程序上的任务启动后,我调用scheduler.shutdown(wait=True),但它不会等待任务完成,我通过一个while循环解决了这个问题使用scheduler.get_jobs()检查挂起的作业,但阅读文档不应该是必需的,因为shutdown(wait=True)就是为了这个目的,或者我读错了。
import time
import os
import random
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime,timezone,timedelta
from zoneinfo import ZoneInfo
some_data = None # task1() updates it and task2() reads it
def time_now():
return datetime.now(timezone.utc).astimezone(ZoneInfo('localtime'))
def time_now_str():
return time_now().strftime('%y-%m-%d %H:%M:%S.%f')
def random_int():
return random.randint(10000000000, 99999999999)
def task1(taskid):
global some_data
os.system('/usr/bin/dd if=/dev/zero of=/dev/null bs=10 count=10000 2> /dev/null') ## dummy task
some_data =random_int()
print(f'## Task1 ID: {taskid}, Generated some random data = {some_data}, '+time_now_str())
return some_data
def task2(taskid):
global some_data
if some_data is not None:
curr_data = some_data
print(f'-- Task2 ID: {taskid}, STARTED task2 for data = {curr_data}: '+time_now_str())
os.system('/usr/bin/dd if=/dev/zero of=/dev/null bs=10 count=5000000 2> /dev/null') ## dummy task
print(f'-- Task2 ID: {taskid}, DONE task2 for data = {curr_data}: '+time_now_str())
else:
print(f'-- Task2 ID: {taskid}, WARNING, no data yet, looks like task1() has not run yet, '+time_now_str())
## main()
task1_scheduler = BackgroundScheduler(executors={'default': ThreadPoolExecutor(1)}, job_defaults={'misfire_grace_time': 500, 'coalesce': False})
task2_scheduler = BackgroundScheduler(executors={'default': ThreadPoolExecutor(1)}, job_defaults={'misfire_grace_time': 500, 'coalesce': False})
start_time = time_now()
print('Start Time: '+start_time.strftime('%y-%m-%d %H:%M:%S.%f'))
task1_tm = start_time
for idx, interval in enumerate( [1, 8, 14], start=1):
task1_id = 'task1_id_'+str(idx)
task1_tm += timedelta(seconds=interval)
task2_scheduler.add_job(task1,'date', run_date=task1_tm, args=[task1_id], id=task1_id, max_instances=1)
task2_tm=start_time
for idx, interval in enumerate([0, 4,19,28], start=1):
task2_id = 'task2_id_'+str(idx)
task2_tm += timedelta(seconds=interval)
task2_scheduler.add_job(task2,'date', run_date=task2_tm, args=[task2_id], id =task2_id, max_instances=1)
task1_scheduler.start()
task2_scheduler.start()
# I should not need this
while len(task1_scheduler.get_jobs()) > 0 or len(task2_scheduler.get_jobs()) > 0:
time.sleep(1)
task1_scheduler.shutdown(wait=True)
task2_scheduler.shutdown(wait=True)
print(f'Tasks completed: {time_now_str()}\n')
实际上,简短的答案是您的任务太快了,所以没有什么可等待的:
第一个任务需要 0.011 s,第二个任务需要 0.0001 s
仔细观察您的工作流程,我看到以下内容:
for idx, interval in enumerate([1, 8, 14], start=1):
以间隔1s
for idx, interval in enumerate([0, 4, 19, 28], start=1)
因此,实际上任务 2 在任务 1 之前开始,但它没有任何数据可供读取,并且您的代码正在完成。
更可能你的意思是
task1_scheduler
:
task2_scheduler.add_job(task1,'date', run_date=task1_tm, args=[task1_id], id=task1_id, max_instances=1)
无论如何,看来你没有以正确的方式使用这个库。让我给你举个例子以防万一:
def get_jobs() -> list[dict[str, Any]]:
"""Get the list of prepared jobs."""
return [
{
"id": "your_function_here_id",
"func": your_function_here,
"replace_existing": True,
"trigger": CronTrigger(hour=23, minute=30, timezone=pytz.utc),
},
# More jobs to run
]
async def start_worker() -> None:
"""Start worker containing cron jobs."""
scheduler = AsyncIOScheduler(
jobstores={"default": MemoryJobStore()},
executors={"default": AsyncIOExecutor()},
job_defaults={"coalesce": False, "max_instances": 1, "misfire_grace_time": 300},
timezone="UTC",
)
for job in get_jobs():
scheduler.add_job(**job)
scheduler.print_jobs()
scheduler.start()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(start_worker())
您最好将
try / finally
与 shutdown
一起添加,但想法如下。希望对您有帮助!