APScheduler Scheduler.shutdown(wait=True) 不等待后台任务完成

问题描述 投票:0回答:1

我正在创建2个后台调度程序,在两个调度程序上的任务启动后,我调用scheduler.shutdown(wait=True),但它不会等待任务完成,我通过一个while循环解决了这个问题使用scheduler.get_jobs()检查挂起的作业,但阅读文档不应该是必需的,因为shutdown(wait=True)就是为了这个目的,或者我读错了。

import time
import os
import random

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime,timezone,timedelta
from zoneinfo import ZoneInfo

some_data = None # task1() updates it and task2() reads it

def time_now():
    return datetime.now(timezone.utc).astimezone(ZoneInfo('localtime'))
    
def time_now_str():
    return time_now().strftime('%y-%m-%d %H:%M:%S.%f')

def random_int():
    return random.randint(10000000000, 99999999999)

def task1(taskid):
    global some_data
    os.system('/usr/bin/dd if=/dev/zero of=/dev/null bs=10 count=10000 2> /dev/null') ## dummy task
    some_data =random_int()
    print(f'## Task1 ID: {taskid}, Generated some random data = {some_data}, '+time_now_str())
    return some_data

def task2(taskid):
    global some_data
    if some_data is not None:
        curr_data = some_data
        print(f'-- Task2 ID: {taskid}, STARTED task2 for data = {curr_data}: '+time_now_str())
        os.system('/usr/bin/dd if=/dev/zero of=/dev/null bs=10 count=5000000 2> /dev/null') ## dummy task
        print(f'-- Task2 ID: {taskid}, DONE task2 for data = {curr_data}: '+time_now_str())
    else:
        print(f'-- Task2 ID: {taskid}, WARNING, no data yet, looks like task1() has not run yet, '+time_now_str())

        
## main()
 
task1_scheduler = BackgroundScheduler(executors={'default': ThreadPoolExecutor(1)}, job_defaults={'misfire_grace_time': 500, 'coalesce': False})
task2_scheduler = BackgroundScheduler(executors={'default': ThreadPoolExecutor(1)}, job_defaults={'misfire_grace_time': 500, 'coalesce': False})
        
start_time = time_now()

print('Start Time: '+start_time.strftime('%y-%m-%d %H:%M:%S.%f'))

task1_tm = start_time
for idx, interval in enumerate( [1, 8, 14], start=1):
    task1_id = 'task1_id_'+str(idx)
    task1_tm += timedelta(seconds=interval)
    task2_scheduler.add_job(task1,'date',  run_date=task1_tm,  args=[task1_id], id=task1_id, max_instances=1)

task2_tm=start_time
for idx, interval in enumerate([0, 4,19,28], start=1):
    task2_id =  'task2_id_'+str(idx)
    task2_tm += timedelta(seconds=interval)
    task2_scheduler.add_job(task2,'date',  run_date=task2_tm, args=[task2_id], id =task2_id, max_instances=1)

task1_scheduler.start()
task2_scheduler.start()

# I should not need this
while len(task1_scheduler.get_jobs()) > 0 or len(task2_scheduler.get_jobs()) > 0:
    time.sleep(1)

task1_scheduler.shutdown(wait=True)
task2_scheduler.shutdown(wait=True)
    
print(f'Tasks completed: {time_now_str()}\n')
  • 操作系统:Linux Mint 21.2
  • APScheduler:v3.10.4
  • Python:v3.10.12
python apscheduler
1个回答
0
投票

实际上,简短的答案是您的任务太快了,所以没有什么可等待的:

第一个任务需要 0.011 s,第二个任务需要 0.0001 s

仔细观察您的工作流程,我看到以下内容:

  • 您的第一个任务是根据以下for idx, interval in enumerate([1, 8, 14], start=1):
    以间隔
    1s
  • 开始
  • 第二个以间隔 0s
    for idx, interval in enumerate([0, 4, 19, 28], start=1)
  • 开始

因此,实际上任务 2 在任务 1 之前开始,但它没有任何数据可供读取,并且您的代码正在完成。

更可能你的意思是

task1_scheduler
task2_scheduler.add_job(task1,'date',  run_date=task1_tm,  args=[task1_id], id=task1_id, max_instances=1)

无论如何,看来你没有以正确的方式使用这个库。让我给你举个例子以防万一:

def get_jobs() -> list[dict[str, Any]]:
    """Get the list of prepared jobs."""

    return [
        {
            "id": "your_function_here_id",
            "func": your_function_here,
            "replace_existing": True,
            "trigger": CronTrigger(hour=23, minute=30, timezone=pytz.utc),
        },
        # More jobs to run
    ]


async def start_worker() -> None:
    """Start worker containing cron jobs."""
    
    scheduler = AsyncIOScheduler(
        jobstores={"default": MemoryJobStore()},
        executors={"default": AsyncIOExecutor()},
        job_defaults={"coalesce": False, "max_instances": 1, "misfire_grace_time": 300},
        timezone="UTC",
    )

    for job in get_jobs():
        scheduler.add_job(**job)

    scheduler.print_jobs()
    scheduler.start()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(start_worker())

您最好将

try / finally
shutdown
一起添加,但想法如下。希望对您有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.