这是进度旋转器的代码:
import sys
import time
def spinning_cursor():
while True:
for cursor in '|/-\\':
yield cursor
spinner = spinning_cursor()
for _ in range(50):
sys.stdout.write(spinner.next())
sys.stdout.flush()
time.sleep(10)
sys.stdout.write('\b')
输出
python2.7 test.py
|
由于循环休眠了 10 秒,因此旋转速度非常慢...
如何在进程休眠时继续旋转旋转器?
您必须创建一个单独的线程。下面的示例粗略地展示了如何做到这一点。然而,这只是一个简单的例子。
import sys
import time
import threading
class SpinnerThread(threading.Thread):
def __init__(self):
super().__init__(target=self._spin)
self._stopevent = threading.Event()
def stop(self):
self._stopevent.set()
def _spin(self):
while not self._stopevent.isSet():
for t in '|/-\\':
sys.stdout.write(t)
sys.stdout.flush()
time.sleep(0.1)
sys.stdout.write('\b')
def long_task():
for i in range(10):
time.sleep(1)
print('Tick {:d}'.format(i))
def main():
task = threading.Thread(target=long_task)
task.start()
spinner_thread = SpinnerThread()
spinner_thread.start()
task.join()
spinner_thread.stop()
if __name__ == '__main__':
main()
您可以小步入睡,直到达到 10 秒:
import sys, time
def spinning_cursor():
while True:
for cursor in '|/-\\':
yield cursor
spinner = spinning_cursor()
end_time = time.time() + 10
while time.time() < end_time:
sys.stdout.write(spinner.next())
sys.stdout.flush()
time.sleep(0.2) # adjust this to change the speed
sys.stdout.write('\b')
但这会阻塞你的主线程,因此只有当你想等待 10 秒而不在 Python 程序中执行任何其他操作(例如,等待某些外部进程完成)时,它才会有用。
如果你想在旋转器旋转时运行其他Python代码,你将需要两个线程——一个用于旋转器,一个用于主要工作。你可以这样设置:
import sys, time, threading
def spin_cursor():
while True:
for cursor in '|/-\\':
sys.stdout.write(cursor)
sys.stdout.flush()
time.sleep(0.1) # adjust this to change the speed
sys.stdout.write('\b')
if done:
return
# start the spinner in a separate thread
done = False
spin_thread = threading.Thread(target=spin_cursor)
spin_thread.start()
# do some more work in the main thread, or just sleep:
time.sleep(10)
# tell the spinner to stop, and wait for it to do so;
# this will clear the last cursor before the program moves on
done = True
spin_thread.join()
# continue with other tasks
sys.stdout.write("all done\n")
生成两个线程,
A
和B
。线程 A
运行 cmd
直至完成。线程 B
显示旋转光标并等待线程 A
退出,这将在 cmd
完成时发生。此时,线程 B
清除旋转的光标,然后退出。
或者使用现有的库而不是重新发明轮子。考虑一下 progressbar 库。您将需要
RotatingMarker
进度指示器。
这里是来自 Fluent Python 的 asyncio 示例(复制到这里作为书籍内容的允许使用),它涵盖了这个确切的程序。它使用异步函数(又名协程)和
asyncio
内置 Python 库来显示旋转器,直到单独的协程等待 3 秒并返回一个值(整数 42)。您可以从本书第 19 章中阅读有关 asyncio
的更多信息。
# spinner_await.py
# credits: Example by Luciano Ramalho inspired by
# Michele Simionato's multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html
# BEGIN SPINNER_AWAIT
import asyncio
import itertools
import sys
async def spin(msg): # <1>
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
await asyncio.sleep(.1) # <3>
except asyncio.CancelledError: # <4>
break
write(' ' * len(status) + '\x08' * len(status))
async def slow_function(): # <5>
# pretend waiting a long time for I/O
await asyncio.sleep(3) # <6>
return 42
async def supervisor(): # <7>
spinner = asyncio.ensure_future(spin('thinking!')) # <8>
print('spinner object:', spinner) # <9>
result = await slow_function() # <10>
spinner.cancel() # <11>
return result
def main():
loop = asyncio.get_event_loop() # <12>
result = loop.run_until_complete(supervisor()) # <13>
loop.close()
print('Answer:', result)
if __name__ == '__main__':
main()
# END SPINNER_AWAIT