我正在使用 OpenCv 和 Dlib 执行来自 网络摄像头流的带有地标的面部识别。语言是Python。它在我的 MacBook 笔记本电脑上运行良好,但我需要它从台式计算机 24/7 运行。该计算机是一台运行 Debian Jessie 的 PC Intel® Core™2 Quad CPU Q6600 @ 2.40GHz 32 位。 性能下降剧烈:由于处理有10秒的延迟!
因此,我研究了多线程来提高性能:
我从 dlib 示例代码中获取了面部标志代码。我知道它可能可以优化,但我想了解为什么我无法通过多线程使用我的(旧)计算机的全部功能?
我将在下面放置我的代码,非常感谢您的阅读:)
from __future__ import print_function
import numpy as np
import cv2
import dlib
from multiprocessing.pool import ThreadPool
from collections import deque
from common import clock, draw_str, StatValue
import video
class DummyTask:
def __init__(self, data):
self.data = data
def ready(self):
return True
def get(self):
return self.data
if __name__ == '__main__':
import sys
print(__doc__)
try:
fn = sys.argv[1]
except:
fn = 0
cap = video.create_capture(fn)
#Face detector
detector = dlib.get_frontal_face_detector()
#Landmarks shape predictor
predictor = dlib.shape_predictor("landmarks/shape_predictor_68_face_landmarks.dat")
# This is where the facial detection takes place
def process_frame(frame, t0, detector, predictor):
# some intensive computation...
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
clahe_image = clahe.apply(gray)
detections = detector(clahe_image, 1)
for k,d in enumerate(detections):
shape = predictor(clahe_image, d)
for i in range(1,68): #There are 68 landmark points on each face
cv2.circle(frame, (shape.part(i).x, shape.part(i).y), 1, (0,0,255), thickness=2)
return frame, t0
threadn = cv2.getNumberOfCPUs()
pool = ThreadPool(processes = threadn)
pending = deque()
threaded_mode = True
latency = StatValue()
frame_interval = StatValue()
last_frame_time = clock()
while True:
while len(pending) > 0 and pending[0].ready():
res, t0 = pending.popleft().get()
latency.update(clock() - t0)
draw_str(res, (20, 20), "threaded : " + str(threaded_mode))
draw_str(res, (20, 40), "latency : %.1f ms" % (latency.value*1000))
draw_str(res, (20, 60), "frame interval : %.1f ms" % (frame_interval.value*1000))
cv2.imshow('threaded video', res)
if len(pending) < threadn:
ret, frame = cap.read()
t = clock()
frame_interval.update(t - last_frame_time)
last_frame_time = t
if threaded_mode:
task = pool.apply_async(process_frame, (frame.copy(), t, detector, predictor))
else:
task = DummyTask(process_frame(frame, t, detector, predictor))
pending.append(task)
ch = cv2.waitKey(1)
if ch == ord(' '):
threaded_mode = not threaded_mode
if ch == 27:
break
cv2.destroyAllWindows()
性能问题是由于 dlib 编译错误造成的。 不要使用
pip install dlib
,与正确的编译相比,它由于某种原因运行得非常慢。这样,我的延迟从近 10 秒缩短到了约 2 秒。所以最终我不需要多线程/处理,但我正在努力进一步提高速度。感谢您的帮助:)
我尝试了一种简化的方法,就像 P.Ro 在他的回答中提到的那样,进程写入输出队列,但不知何故,队列在大多数情况下都被锁定,因为所有进程同时写入它。 (只是我的猜测)我可能做错了什么。
最后我最终使用了管道。
代码很糟糕。但如果我是几个小时前的我。我仍然很高兴找到一个实际运行起来毫不费力的例子。
from multiprocessing import Process, Queue, Manager,Pipe
import multiprocessing
import face_recognition as fik
import cv2
import time
video_input = 0
obama_image = fik.load_image_file("obama.png")
obama_face_encoding = fik.face_encodings(obama_image)[0]
quality = 0.7
def f(id,fi,fl):
import face_recognition as fok
while True:
small_frame = fi.get()
print("running thread"+str(id))
face_locations = fok.face_locations(small_frame)
if(len(face_locations)>0):
print(face_locations)
for (top7, right7, bottom7, left7) in face_locations:
small_frame_c = small_frame[top7:bottom7, left7:right7]
fl.send(small_frame_c)
fps_var =0
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
# global megaman
with Manager() as manager:
video_capture = cv2.VideoCapture(video_input)
fi = Queue(maxsize=14)
threads = 8
proc = []
parent_p = []
thread_p = []
# procids = range(0,threads)
for t in range(0,threads):
p_t,c_t = Pipe()
parent_p.append(p_t)
thread_p.append(c_t)
print(t)
proc.append(Process(target=f, args=(t,fi,thread_p[t])))
proc[t].start()
useframe = False
frame_id = 0
while True:
# Grab a single frame of video
ret, frame = video_capture.read()
effheight, effwidth = frame.shape[:2]
if effwidth < 20:
break
# Resize frame of video to 1/4 size for faster face recognition processing
xxx = 930
yyy = 10/16 #0.4234375
small_frame = cv2.resize(frame, (xxx, int(xxx*yyy)))
if frame_id%2 == 0:
if not fi.full():
fi.put(small_frame)
print(frame_id)
cv2.imshow('Video', small_frame)
print("FPS: ", int(1.0 / (time.time() - fps_var)))
fps_var = time.time()
#GET ALL DETECTIONS
for t in range(0,threads):
if parent_p[t].poll():
small_frame_c = parent_p[t].recv()
cv2.imshow('recc', small_frame_c)
height34, width34 = small_frame_c.shape[:2]
# print fsizeee
if(width34<20):
print("face 2 small")
print(width34)
break
face_encodings_cam = fik.face_encodings(small_frame_c,[(0, width34, height34, 0)])
match = fik.compare_faces([obama_face_encoding], face_encodings_cam[0])
name = "Unknown"
if match[0]:
name = "Barack"
print(name)
break
frame_id += 1
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord('q'):
break
没有太多使用ThreadPool的经验,但我总是只使用Process,如下所示。您应该能够轻松编辑此代码以满足您的需求。我写这篇文章时考虑到了你的实现。
此代码将获取核心数量并启动许多工作进程,这些进程都将并行实现所需的功能。 它们都共享一个帧队列用于输入,并全部放入同一输出队列以供主程序获取和显示。每个队列都有一个最大大小,在本例中为 5。这确保了尽管处理需要 CPU 时间,但它始终是相对活跃的时间。
import numpy as np
import cv2
from multiprocessing import Process, Queue
import time
#from common import clock, draw_str, StatValue
#import video
class Canny_Process(Process):
def __init__(self,frame_queue,output_queue):
Process.__init__(self)
self.frame_queue = frame_queue
self.output_queue = output_queue
self.stop = False
#Initialize your face detectors here
def get_frame(self):
if not self.frame_queue.empty():
return True, self.frame_queue.get()
else:
return False, None
def stopProcess(self):
self.stop = True
def canny_frame(self,frame):
# some intensive computation...
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 100)
#To simulate CPU Time
#############################
for i in range(1000000):
x = 546*546
res = x/(i+1)
#############################
'REPLACE WITH FACE DETECT CODE HERE'
if self.output_queue.full():
self.output_queue.get_nowait()
self.output_queue.put(edges)
def run(self):
while not self.stop:
ret, frame = self.get_frame()
if ret:
self.canny_frame(frame)
if __name__ == '__main__':
frame_sum = 0
init_time = time.time()
def put_frame(frame):
if Input_Queue.full():
Input_Queue.get_nowait()
Input_Queue.put(frame)
def cap_read(cv2_cap):
ret, frame = cv2_cap.read()
if ret:
put_frame(frame)
cap = cv2.VideoCapture(0)
threadn = cv2.getNumberOfCPUs()
threaded_mode = True
process_list = []
Input_Queue = Queue(maxsize = 5)
Output_Queue = Queue(maxsize = 5)
for x in range((threadn -1)):
canny_process = Canny_Process(frame_queue = Input_Queue,output_queue = Output_Queue)
canny_process.daemon = True
canny_process.start()
process_list.append(canny_process)
ch = cv2.waitKey(1)
cv2.namedWindow('Threaded Video', cv2.WINDOW_NORMAL)
while True:
cap_read(cap)
if not Output_Queue.empty():
result = Output_Queue.get()
cv2.imshow('Threaded Video', result)
ch = cv2.waitKey(5)
if ch == ord(' '):
threaded_mode = not threaded_mode
if ch == 27:
break
cv2.destroyAllWindows()
这应该可以解决问题,只需更改我的精明功能来进行面部检测即可。我根据你的代码写了这个,并对两者进行了比较。这明显更快。我在这里使用 multiprocessing.Process 。在 python 中,进程是真正并行的,而线程由于 GIL 的原因并不完全并行。我使用 2 个队列在主进程和进程之间来回发送数据。队列是线程和进程安全的。
你可以使用这个,多线程:
from imutils.video import VideoStream
# Initialize multithreading the video stream.
videostream = "rtsp://192.168.x.y/user=admin=xxxxxxx_channel=vvvv=1.sdp?params"
vs = VideoStream(src=videostream, resolution=frameSize,
framerate=32).start()
frame = vs.read()
此实现利用多处理,基于“面部识别示例”。它通过缓冲帧来发挥作用,这些帧随后在多个核心上进行处理。使用 concurrent.futures.ProcessPoolExecutor()
可以实现这种并行处理。您可以根据自己的喜好调整
amountOfFrameBuffer
,最大值为可用 CPU 核心的数量。我希望您发现这种方法有用。from collections import deque
import concurrent.futures
import face_recognition
import cv2
import numpy as np
# -- Modify this Values (maximum is your coreAmount) ---
amountOfFrameBuffer = 20
# max quality -> 1
scaleDownFactorForFasterProcessing = 4
# -- END --
def setup_face_recognition():
# Load a sample picture and learn how to recognize it.
obama_image = face_recognition.load_image_file("obama.jpg")
obama_face_encoding = face_recognition.face_encodings(obama_image)[0]
# Load a second sample picture and learn how to recognize it.
biden_image = face_recognition.load_image_file("biden.jpg")
biden_face_encoding = face_recognition.face_encodings(biden_image)[0]
# Create arrays of known face encodings and their names
known_face_encodings = [
obama_face_encoding,
biden_face_encoding
]
known_face_names = [
"Barack Obama",
"Joe Biden"
]
return known_face_encodings, known_face_names
def processFrame(frame, known_face_encodings, known_face_names):
global scaleDownFactorForFasterProcessing
# Initialize some variables
face_locations = []
face_encodings = []
face_names = []
# Resize frame of video to 1/4 size for faster face recognition processing
resizeValue = 1 / scaleDownFactorForFasterProcessing
small_frame = cv2.resize(frame, (0, 0), fx=resizeValue, fy=resizeValue)
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
# Find all the faces and face encodings in the current frame of video
face_locations = face_recognition.face_locations(rgb_small_frame)
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations)
face_names = []
for face_encoding in face_encodings:
# See if the face is a match for the known face(s)
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
name = "Unknown"
# # If a match was found in known_face_encodings, just use the first one.
# if True in matches:
# first_match_index = matches.index(True)
# name = known_face_names[first_match_index]
# Or instead, use the known face with the smallest distance to the new face
face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
best_match_index = np.argmin(face_distances)
if matches[best_match_index]:
name = known_face_names[best_match_index]
face_names.append(name)
return face_locations, face_names
def displayResults(frame, face_locations, face_names):
global scaleDownFactorForFasterProcessing
# Display the results
for (top, right, bottom, left), name in zip(face_locations, face_names):
# Scale back up face locations since the frame we detected in was scaled to 1/4 size
top *= scaleDownFactorForFasterProcessing
right *= scaleDownFactorForFasterProcessing
bottom *= scaleDownFactorForFasterProcessing
left *= scaleDownFactorForFasterProcessing
# Draw a box around the face
cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)
# Draw a label with a name below the face
cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)
# Display the resulting image
cv2.imshow('Video', frame)
return
def main():
global amountOfFrameBuffer
# Get a reference to webcam #0 (the default one)
video_capture = cv2.VideoCapture(0)
# Set the FPS (for example, 30 FPS)
video_capture.set(cv2.CAP_PROP_FPS, 30)
# setup data
known_face_encodings, known_face_names = setup_face_recognition()
# Initalize some variables for subprocess management
processQueue = deque()
frameQueue = deque()
initial = True
with concurrent.futures.ProcessPoolExecutor() as executer:
while True:
# initial fill up frame / process buffer / queue
if initial:
#read frame amountOfFrameBuffer times and start processes
for _ in range(amountOfFrameBuffer - 1):
# read
# Grab a single frame of video
ret, frame = video_capture.read()
#start process
process = executer.submit(processFrame, frame, known_face_encodings, known_face_names)
processQueue.append(process)
#add frame to frameQueue
frameQueue.append(frame)
initial = False
else:
# normal run
# read frame
# Grab a single frame of video
ret, frame = video_capture.read()
# start process for this frame and add to queue
process = executer.submit(processFrame, frame, known_face_encodings, known_face_names)
processQueue.append(process)
#add frame to frameQueue
frameQueue.append(frame)
# display next processed frame
nextFrameProcess = processQueue.popleft()
face_locations, face_names = nextFrameProcess.result()
frameMatchingToTheProcess = frameQueue.popleft()
displayResults(frameMatchingToTheProcess, face_locations, face_names)
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Release handle to the webcam
video_capture.release()
cv2.destroyAllWindows()
return
if __name__ == '__main__':
main()
print("Programm ended")