我正在阅读 schedule API 的文档,并且在 “在后台运行”示例中,他们将 run 方法定义为 ScheduleThread 类中的类方法。代码如下:
import threading
import time
import schedule
def run_continuously(interval=1):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def background_job():
print('Hello from the background thread')
schedule.every().second.do(background_job)
# Start the background thread
stop_run_continuously = run_continuously()
# Do some other things...
time.sleep(10)
# Stop the background thread
stop_run_continuously.set()
我不明白他们为什么在这里使用@classmethod。通过一些研究,似乎 run() 应该始终是 Thread 子类中的实例方法。这是一个错误还是我错过了什么?
我运行了文档中未更改的代码,然后将类方法更改为实例方法(删除了装饰器并将 run(cls) 替换为 run(self)),然后再次运行代码,行为是相同的。
我预计某些东西会损坏或出现不同的行为。
该方法没有充分的理由成为
classmethod
。正如您所注意到的,它仍然有效,但它不是线程子类的常用 API。
确实,该代码的整体设计有点奇怪。如果您想将一些数据与正在运行的线程捆绑在一起,则线程子类确实有意义,但如果您打算这样做,则将整个事情实现为闭包是没有意义的。有两种更合理的选择:
首先,您可以将数据(事件)放入类中:
class run_continuously(threading.Thread):
def __init__(self, interval=1):
super().__init__()
self.interval = interval # make these values instance variables
self.cease_continuous_run = threading.Event()
self.start()
def run(self): # now we need to be an instance method, because we use instance vars
while not self.cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(self.interval)
def stop(self): # the event API can now be an internal implementation detail, not
self.cease_continuous_run.set() # something the user of our code needs to know
schedule.every().second.do(background_job)
rc = run_continuously() # create the thread, which starts itself
time.sleep(10)
rc.stop()
其次,您可以放弃使用线程子类,而只创建一个具有函数目标的线程,使用闭包以与原始代码相同的方式保存数据:
def run_continuously(interval=1):
cease_continuous_run = threading.Event()
def helper():
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = threading.Thread(target=helper)
continuous_thread.start()
return cease_continuous_run
在这两个示例中,我取消了虚假的
classmethod
装饰器,要么是因为它是必要的(因为我们想要访问实例变量),要么是因为我完全取消了该类。