这是我的 Twisted Python 客户端。如果我能让它在没有服务器的情况下工作,我会的,但让它完全工作是很困难的。我真的不想将任何数据从其他服务器发送到客户端。我只是想让客户端将数据转储到服务器。发生的情况是,在发送任何数据之前,我必须输入“q”才能退出 while True 循环。我一直在尝试查找帧并找到了 LineReceiver,但我想我真的想要一个 LineSender。我怎样才能让这个程序真正发挥作用?请注意,如果您运行此程序,您的网络摄像头将打开,您将看到您的实时视频。在视频窗口中按“q”键退出。您需要提供在 localhost:3000 上运行的自己的服务器。我想我想要某种异步的东西,但我还没有弄清楚。主要是,我想用 python 将(非视频)从客户端流式传输到服务器。
import cv2
import mediapipe as mp
import mediapipe.python.solutions.drawing_styles as ds
import typing
import numpy as np
from twisted.internet import reactor, threads
from twisted.internet.protocol import Protocol, ClientFactory
from twisted.protocols.basic import LineReceiver
from queue import Queue
from twisted.internet.task import LoopingCall
from twisted.python import log
mp_hands = mp.solutions.hands.HandLandmark
hand = [[mp_hands.WRIST, "WRIST", "radiocarpal"],
[mp_hands.THUMB_CMC, "THUMB_CMC", "carpometacarpal_1"],
[mp_hands.THUMB_MCP, "THUMB_MCP", "metacarpophalangeal_1"],
[mp_hands.THUMB_IP, "THUMB_IP", "carpal_interphalangeal_1"],
[mp_hands.THUMB_TIP, "THUMB_TIP", "carpal_distal_phalanx_1"],
[mp_hands.INDEX_FINGER_MCP, "INDEX_FINGER_MCP", "metacarpophalangeal_2"],
[mp_hands.INDEX_FINGER_PIP, "INDEX_FINGER_PIP", "carpal_proximal_interphalangeal_2"],
[mp_hands.INDEX_FINGER_DIP, "INDEX_FINGER_DIP", "carpal_distal_interphalangeal_2"],
[mp_hands.INDEX_FINGER_TIP, "INDEX_FINGER_TIP", "carpal_distal_phalanx_2"],
[mp_hands.MIDDLE_FINGER_MCP, "MIDDLE_FINGER_MCP", "metacarpophalangeal_3"],
[mp_hands.MIDDLE_FINGER_PIP, "MIDDLE_FINGER_PIP", "carpal_proximal_interphalangeal_3"],
[mp_hands.MIDDLE_FINGER_DIP, "MIDDLE_FINGER_DIP", "carpal_distal_interphalangeal_3"],
[mp_hands.MIDDLE_FINGER_TIP, "MIDDLE_FINGER_TIP", "carpal_distal_phalanx_3"],
[mp_hands.RING_FINGER_MCP, "RING_FINGER_MCP", "metacarpophalangeal_4"],
[mp_hands.RING_FINGER_PIP, "RING_FINGER_PIP", "carpal_proximal_interphalangeal_4"],
[mp_hands.RING_FINGER_DIP, "RING_FINGER_DIP", "carpal_distal_interphalangeal_4"],
[mp_hands.RING_FINGER_TIP, "RING_FINGER_TIP", "carpal_distal_phalanx_4"],
[mp_hands.PINKY_MCP, "PINKY_MCP", "metacarpophalangeal_5"],
[mp_hands.PINKY_PIP, "PINKY_PIP", "carpal_proximal_interphalangeal_5"],
[mp_hands.PINKY_DIP, "PINKY_DIP", "carpal_distal_interphalangeal_5"],
[mp_hands.PINKY_TIP, "PINKY_TIP", "carpal_distal_phalanx_5"]]
# Load the MediaPipe Sign Language Detection model
mp_holistic = mp.solutions.holistic
holistic = mp_holistic.Holistic()
# Load the MediaPipe Drawing utils for visualization
mp_drawing = mp.solutions.drawing_utils
# load MedidPipe hands solutions
# Load the video file
# video_path = "path_to_video_file.mp4"
video_path = 0
cap = cv2.VideoCapture(video_path)
from twisted.internet import reactor, protocol
class Server(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
class ServerFactory(protocol.Factory):
def buildProtocol(self, addr):
return Server()
class ClientFactory(protocol.ClientFactory):
def buildProtocol(self, addr):
return Client()
def clientConnectionFailed(self, connector, reason):
print("Connection failed.")
reactor.stop()
def clientConnectionLost(self, connector, reason):
print("Connection lost.")
reactor.stop()
# sendMessages()
class Client(protocol.Protocol):
def startedEvent(self):
print('started event')
def performAnAction(self):
print('performed an action')
self.bufferSend()
# reactor.stop()
def connectionMade(self):
self.sendMessage("Marker: Start") # Send marker for the first frame
self.buffer = []
self.runRecognizer()
def sendMessage(self, message):
self.transport.write(message.encode())
# self.transport.write(message)
def bufferSend(self):
message = "\n".join(self.buffer)+"\n"
# print(message)
self.sendMessage(message)
self.buffer = []
def bufferMessage(self, message):
self.buffer.append(message);
def printHand(self, lmk, joint_string: str, frame):
self.bufferMessage(f"J:{joint_string}")
# print(lmk)
x = lmk.x
y = lmk.y
z = lmk.z
v = lmk.visibility
shape = frame.shape
self.bufferMessage(f"X:{x}")
self.bufferMessage(f"Y:{y}")
self.bufferMessage(f"Z:{z}")
relative_x = int(x * shape[1])
relative_y = int(y * shape[0])
relative_z = int(z * shape[2])
self.bufferMessage(f"XR:{relative_x}")
self.bufferMessage(f"YR:{relative_y}")
self.bufferMessage(f"ZR:{relative_z}")
cv2.putText(img=frame, text=joint_string, org=(relative_x, relative_y), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.5, color=(255, 0, 0), thickness=1, lineType=cv2.LINE_AA)
def runRecognizer(self):
certainAmount = 5.0 # this is in seconds
self.bufferMessage("S:D")
while True:
if cap.isOpened():
retval, frame = cap.read()
if not retval:
break
# Convert the frame to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect the signs in the frame
results = holistic.process(frame_rgb)
#signs = []
#if results.pose_landmarks:
# sign = results.pose_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_ELBOW].visibility
# signs.append(sign)
for connection in mp_holistic.HAND_CONNECTIONS:
# print(connection)
self.bufferMessage(f"F:{connection[0]}")
self.bufferMessage(f"T:{connection[1]}")
if results.left_hand_landmarks:
mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
for landmark in hand:
lmk = results.left_hand_landmarks.landmark[landmark[0]]
self.printHand(lmk, "l_"+landmark[2], frame)
if results.right_hand_landmarks:
mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
for landmark in hand:
lmk = results.right_hand_landmarks.landmark[landmark[0]]
self.printHand(lmk, "r_"+landmark[2], frame)
if results.pose_landmarks:
mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
for landmark in range(len(results.pose_landmarks.landmark)):
lmk = results.pose_landmarks.landmark[landmark]
self.printHand(lmk, str(landmark), frame)
# Display the frame
cv2.imshow("Sign Language Detection", frame)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
reactor.callLater(certainAmount, self.performAnAction)
self.transport.loseConnection()
cap.release()
cv2.destroyAllWindows()
from twisted.internet import task
reactor.connectTCP('127.0.0.1', 3000, ClientFactory())
reactor.listenTCP(3002, ServerFactory())
reactor.run()
# # Convert the detected signs to English textual paragraphs
# textual_paragraphs = []
# current_paragraph = ""
# for sign in signs:
# if sign > 0.5:
# current_paragraph += " "
# else:
# current_paragraph += "."
# textual_paragraphs.append(current_paragraph)
# current_paragraph = ""
#
# # Print the English textual paragraphs
# for paragraph in textual_paragraphs:
# print(paragraph)
发生的事情是我必须在发送任何数据之前输入“q”退出 while True 循环
Twisted程序默认主要是单线程的。特别是,所有应用程序代码都将在单个线程中执行 - 与反应器一起。这是一个“协作多任务”系统 - 代码运行直到它放弃控制,然后其他人被调度并运行直到它放弃控制,等等。
while True
保持控制并防止其他任何事情发生,包括发送任何数据。
摆脱
while True
并以合作的方式安排循环的迭代 - 例如,使用 reactor.callLater
或可能稍高级别的 twisted.internet.task.LoopingCall
。
此外,请确保避免在
while
循环(以及其他任何地方)的主体中进行任何阻塞调用。例如,如果使用某些参数调用,cv2.waitKey
could会阻塞一段时间,这也会阻止应用程序代码的其余部分或反应器运行。我认为 cv2.waitKey(1)
可能或多或少是安全的,因为我认为 1 是
等待毫秒,但您应该确认这一点 - 并且您可能还想以非阻塞方式处理输入,例如使用 twisted.internet.stdio
。