我目前正在尝试对由Raspberry Pi控制的机械臂进行编程。
到目前为止,一切正常,除了一件事,我已经在Google上搜索并尝试了许多小时,但找不到有效的解决方案。
[为了使机械臂运动,必须同时用螺纹同时运行所有电动机(正常工作)。
我的问题是,一旦完成运动,但其他电机仍在运行(线程),我需要更新一个标签,该标签显示轴(电机)的当前角度。
经过大量研究,我认为我是通过使用队列和Tkinters后方法找到了解决方案的。但是它仍然不起作用,因为标签文本仅在所有线程终止后才更新。
我编写了一个示例代码,我想获取电动机“ one”的标签更新,该标签更新将在电动机“ two”(500次迭代)之前完成其for循环(100次迭代)。我希望标签会在第二个电动机仍在运行时,在第一个电动机达到其目标时得到更新。
但是,尽管我使用了后方法,但它仍要等到电动机2完成后才能更新标签。
希望你能帮助我!
from tkinter import *
import threading
import time
from queue import *
class StepperMotors:
def __init__(self, root):
self.root = root
self.start_btn = Button(root, text="Start", command=lambda:self.start_movement())
self.start_btn.config(width = 10)
self.start_btn.grid(row=1,column=1)
self.label_one = Label(root, text='')
self.label_one.config(width = 10)
self.label_one.grid(row=2, column=1)
self.label_two = Label(root, text='')
self.label_two.config(width = 10)
self.label_two.grid(row=3, column=1)
def start_movement(self):
self.thread_queue = Queue()
self.root.after(100, self.wait_for_finish)
thread_one = threading.Thread(target=self.motor_actuation, args=(1,100))
thread_two = threading.Thread(target=self.motor_actuation, args=(2,500))
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()
def motor_actuation(self, motor, iterations):
for i in range(iterations):
i = i+1
update_text = str(motor) + " " + str(i) + "\n"
print(update_text)
time.sleep(0.01)
self.thread_queue.put(update_text)
def wait_for_finish(self):
try:
self.text = self.thread_queue.get()
self.label_one.config(text=self.text)
except self.thread_queue.empty():
self.root.after(100, self.wait_for_finish)
if __name__ == "__main__":
root = Tk()
root.title("test")
stepper = StepperMotors(root)
root.mainloop()
最好使用非阻塞的守护线程。
[另外,最好设置separation of concerns:机器人(或机器人手臂)可以是具有自己生存时间的对象:守护线程。同上,您可以定义一个“ LabelUpdater”来读取机械手的状态并更新标签。
让我们定义一个机器人:
class Robot(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, end_pos: int):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.end_pos = end_pos
def run(self) -> None:
for angle in range(self.end_pos):
self.label_queue.put(angle)
time.sleep(0.01)
让我们定义LabelUpdater:
class LabelUpdater(threading.Thread): def __init__(self, name: str, label_queue: queue.Queue, root_app: tkinter.Tk, variable: tkinter.Variable): super().__init__(name=name) self.daemon = True self.label_queue = label_queue self.root_app = root_app self.variable = variable def run(self) -> None: # run forever while True: # wait a second please time.sleep(1) # consume all the queue and keep only the last message last_msg = None while True: try: msg = self.label_queue.get(block=False) except queue.Empty: break last_msg = msg self.label_queue.task_done() if last_msg: self.variable.set(last_msg) self.root_app.update_idletasks()
然后,主应用程序应定义:
tkinter.StringVar
变量将被更新,class StepperMotors: def __init__(self, root): self.root = root self.label_one_queue = queue.Queue() self.label_two_queue = queue.Queue() self.start_btn = tkinter.Button(root, text="Start", command=lambda: self.start_movement()) self.start_btn.config(width=10) self.start_btn.grid(row=1, column=1) self.text_one = tkinter.StringVar() self.text_one.set("one") self.label_one = tkinter.Label(root, textvariable=self.text_one) self.label_one.config(width=10) self.label_one.grid(row=2, column=1) self.text_two = tkinter.StringVar() self.text_two.set("two") self.label_two = tkinter.Label(root, textvariable=self.text_two) self.label_two.config(width=10) self.label_two.grid(row=3, column=1) self.robot_one = Robot("robot_one", self.label_one_queue, 100) self.robot_two = Robot("robot_two", self.label_two_queue, 500) self.updater_one = LabelUpdater("updater_one", self.label_one_queue, self.root, self.text_one) self.updater_two = LabelUpdater("updater_two", self.label_two_queue, self.root, self.text_two) self.updater_one.start() self.updater_two.start() def start_movement(self): self.robot_one.start() self.robot_two.start()
当然,您需要标记或其他东西来检查每个机器人是否尚未运行。