我一直在使用Python的 schedule
模块来调度不相互阻塞的重叠任务;然而,推荐的方法是通过使用python的 threading
模块(当然,它很容易受到 GIL).
引用日程模块的 文件:
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(1).seconds.do(run_threaded, job)
schedule.every(2).seconds.do(run_threaded, job)
schedule.every(3).seconds.do(run_threaded, job)
schedule.every(4).seconds.do(run_threaded, job)
schedule.every(5).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
我们如何安排和执行任务,而不用担心。GIL?
我们如何在调度和执行任务时,不担心会出现 GIL?
解决这个问题的一个相对简单的方法是使用多处理模块......这给我们提供了类似于cron的行为,而不用担心GIL问题......。
用多处理避免GIL问题并没有什么创新.... 我只是在这里记录下来,希望对未来的google用户有所帮助......
from multiprocessing import Process
from datetime import datetime
import time
from schedule import Scheduler
class MPScheduler(Scheduler):
def __init__(self, args=None, kwargs=None):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
super(MPScheduler, self).__init__(*args, **kwargs)
# Among other things, this object inherits self.jobs (a list of jobs)
self.args = args
self.kwargs = kwargs
self.processes = list()
def _mp_run_job(self, job_func):
"""Spawn another process to run the job; multiprocessing avoids GIL issues"""
job_process = Process(target=job_func, args=self.args,
kwargs=self.kwargs)
job_process.daemon = True
job_process.start()
self.processes.append(job_process)
def run_pending(self):
"""Run any jobs which are ready"""
runnable_jobs = (job_obj for job_obj in self.jobs if job_obj.should_run)
for job_obj in sorted(runnable_jobs):
job_obj.last_run = datetime.now() # Housekeeping
self._mp_run_job(job_obj.job_func)
job_obj._schedule_next_run() # Schedule the next execution datetime
self._retire_finished_processes()
def _retire_finished_processes(self):
"""Walk the list of processes and retire finished processes"""
retirement_list = list() # List of process objects to remove
for idx, process in enumerate(self.processes):
if process.is_alive():
# wait a short time for process to finish
process.join(0.01)
else:
retirement_list.append(idx)
## Retire finished processes
for process_idx in sorted(retirement_list, reverse=True):
self.processes.pop(process_idx)
def job(id, hungry=True):
print("{} running {} and hungry={}".format(datetime.now(), id, hungry))
time.sleep(10) # This job runs without blocking execution of other jobs
if __name__=='__main__':
# Build a schedule of overlapping jobs...
mp_sched = MPScheduler()
mp_sched.every(1).seconds.do(job, id=1, hungry=False)
mp_sched.every(2).seconds.do(job, id=2)
mp_sched.every(3).seconds.do(job, id=3)
mp_sched.every(4).seconds.do(job, id=4)
mp_sched.every(5).seconds.do(job, id=5)
while True:
mp_sched.run_pending()
time.sleep(1)