我正在寻找一种使用 Tkinter 进度条跟踪多处理任务的方法。使用
tqdm
可以非常直接地完成此操作,以便在终端中显示。
我不想使用
tqdm
,而是想使用ttk.Progressbar
,但我对此所做的所有尝试,任务都会阻止尝试更新进度条(例如使用 update_idletasks 等)。以下是我正在寻找的解决方案的模板:
import time
from multiprocessing import Pool
from tqdm import tqdm
import tkinter as tk
import tkinter.ttk as ttk
def task(x):
time.sleep(0.1)
return x * x
def start_task():
num_processes = 12
num_tasks = 100
with Pool(processes=num_processes) as pool:
with tqdm(total=num_tasks, desc="Processing") as pbar:
def update_progress(_):
# <Insert update to tk progress bar here>
pbar.update(1)
for i in range(num_tasks):
pool.apply_async(task, args=(i,), callback=update_progress)
pool.close()
pool.join()
if __name__ == "__main__":
root = tk.Tk()
root.title("Task Progress")
progress_bar = ttk.Progressbar(root, maximum=100, length=300)
progress_bar.pack(pady=20)
button = tk.Button(text="Start", command=start_task)
button.pack(fill="x", padx=10, pady=10)
root.mainloop()
在解决方案中,我还想获取任务的输出(在本例中是 x*x 的列表)。
如果另一个多处理结构可以更好地工作,请随意调整(池似乎是最简单的演示)。
这是一个以前在 Stack Overflow 上被问过的问题,但我之前找到的所有答案都不是最小的例子,我发现它们不是很有帮助。
这是一个可以做到这一点的简单代码。我用
multiprocessing
在单独的进程中执行任务Queue
用于主进程和工作进程之间的通信,在本例中为 task
函数root.update_idletasks()
更新主线程中的进度条mp.queues.Empty
优雅地处理潜在的异常import time
import multiprocessing as mp
import tkinter as tk
import tkinter.ttk as ttk
def task(return_queue, num_tasks):
# perform the tasks and update the progress bar
for i in range(num_tasks):
time.sleep(0.1)
# return a tuple with the task number for the progressbar
# and the value equivalent to your x * x
return_queue.put((i, i * i))
def start_task():
num_tasks = 100
return_queue = mp.Queue() # queue for inter-process communication
process = mp.Process(target=task, args=(return_queue, num_tasks))
process.start()
list_returnded_values = [] # list to store the returned values
while process.is_alive():
try:
task_return = return_queue.get(timeout=1)
progress_bar['value'] = task_return[0] # task number for progressbar
root.update_idletasks()
list_returnded_values.append(task_return[1]) # value of x*x
except mp.queues.Empty:
pass
process.join() # wait for the process to finish
print(list_returnded_values)
if __name__ == "__main__":
root = tk.Tk()
root.title("Task Progress")
progress_bar = ttk.Progressbar(root, maximum=100, length=300, mode="determinate")
progress_bar.pack(pady=20)
button = tk.Button(text="Start", command=start_task)
button.pack(fill="x", padx=10, pady=10)
root.mainloop()