我正在尝试使用 Python 中的多处理在 Raspberry Pi 中运行 Flask 服务器。使用多重处理的原因是我需要(同时)不断检查是否已达到边界(使用 opencv 和相机)以及从 PC 控制一辆小型汽车。
我用于控制电机的代码使用了线程并且按预期工作。然而,更新版本使用了多重处理,并且电机的行为就好像它们没有获得足够的动力来移动汽车一样。
原代码为:
import cv2
from picamera import PiCamera
import io
import threading
from flask import Flask, Response, jsonify
import RPi.GPIO as GPIO
from time import sleep
in1 = 23
in2 = 24
en_a = 25
in3 = 17
in4 = 27
en_b = 22
temp1 = 1
GPIO.setmode(GPIO.BCM)
GPIO.setup(in1,GPIO.OUT)
GPIO.setup(in2,GPIO.OUT)
GPIO.setup(en_a,GPIO.OUT)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.LOW)
GPIO.setup(in3,GPIO.OUT)
GPIO.setup(in4,GPIO.OUT)
GPIO.setup(en_b,GPIO.OUT)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.LOW)
power_a = GPIO.PWM(en_a,1000)
power_b = GPIO.PWM(en_b,1000)
power_a.start(1)
power_b.start(1)
app = Flask(__name__)
def capture_frame(camera):
success, frame = camera.read()
if not success:
print("failed to read frame")
return None
else:
ret, buffer = cv2.imencode('.jpg', frame)
if not ret:
return None
frame_bytes = buffer.tobytes()
return frame_bytes
def generate_image(camera):
if camera == "usb_camera":
camera = cv2.VideoCapture(1)
image = capture_frame(camera)
camera.release()
elif camera == "pi_camera":
camera = PiCamera()
camera.start_preview()
stream = io.BytesIO()
camera.capture(stream, format='jpeg')
stream.seek(0)
image = stream.read()
camera.stop_preview()
camera.close()
if image:
yield (image)
# The format below is to be able to see the image from a browser
# yield (b'--frame\r\n'
# b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n')
@app.route('/video-usb')
def video_usb():
return Response(generate_image("usb_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/video-raspi')
def video_raspi():
return Response(generate_image("pi_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/move_forward', methods=['POST'])
def move_forward():
power_a.ChangeDutyCycle(50)
power_b.ChangeDutyCycle(50)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.HIGH)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.HIGH)
return jsonify({"message": "Car moving forward"})
@app.route('/move_backward', methods=['POST'])
def move_backward():
power_a.ChangeDutyCycle(50)
power_b.ChangeDutyCycle(50)
GPIO.output(in1,GPIO.HIGH)
GPIO.output(in2,GPIO.LOW)
GPIO.output(in3,GPIO.HIGH)
GPIO.output(in4,GPIO.LOW)
return jsonify({"message": "Car moving backwards"})
@app.route('/move_right', methods=['POST'])
def move_right():
power_a.ChangeDutyCycle(100)
power_b.ChangeDutyCycle(100)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.HIGH)
GPIO.output(in3,GPIO.HIGH)
GPIO.output(in4,GPIO.LOW)
return jsonify({"message": "Car turning right"})
@app.route('/move_left', methods=['POST'])
def move_left():
power_a.ChangeDutyCycle(100)
power_b.ChangeDutyCycle(100)
GPIO.output(in1,GPIO.HIGH)
GPIO.output(in2,GPIO.LOW)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.HIGH)
return jsonify({"message": "Car turning left"})
@app.route('/move_stop', methods=['POST'])
def move_stop():
power_a.start(1)
power_b.start(1)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.LOW)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.LOW)
return jsonify({"message": "Car stopped"})
def start_server():
app.run(host='192.168.100.36', port=8000, threaded=True)
if __name__ == '__main__':
server_thread = threading.Thread(target=start_server)
server_thread.daemon = True
server_thread.start()
server_thread.join()
修改后的版本使用多处理,我可以看到树莓派正在获取信号来启用电机(灯亮了,电机发出噪音),但就好像电机由于某种原因以非常低的功率工作。代码是:
import cv2
from picamera import PiCamera
import io
from flask import Flask, Response, jsonify
import RPi.GPIO as GPIO
from time import sleep
import multiprocessing
import numpy as np
in1 = 23
in2 = 24
en_a = 25
in3 = 17
in4 = 27
en_b = 22
temp1 = 1
GPIO.setmode(GPIO.BCM)
GPIO.setup(in1,GPIO.OUT)
GPIO.setup(in2,GPIO.OUT)
GPIO.setup(en_a,GPIO.OUT)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.LOW)
GPIO.setup(in3,GPIO.OUT)
GPIO.setup(in4,GPIO.OUT)
GPIO.setup(en_b,GPIO.OUT)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.LOW)
power_a = GPIO.PWM(en_a,1000)
power_b = GPIO.PWM(en_b,1000)
power_a.start(1)
power_b.start(1)
app = Flask(__name__)
def capture_frame(camera):
success, frame = camera.read()
if not success:
print("failed to read frame")
return None
else:
ret, buffer = cv2.imencode('.jpg', frame)
if not ret:
return None
frame_bytes = buffer.tobytes()
return frame_bytes
return None
def generate_image(camera):
if camera == "usb_camera":
camera = cv2.VideoCapture(1)
image = capture_frame(camera)
camera.release()
elif camera == "pi_camera":
camera = PiCamera()
camera.start_preview()
stream = io.BytesIO()
camera.capture(stream, format='jpeg')
stream.seek(0)
image = stream.read()
camera.stop_preview()
camera.close()
if image:
yield (image)
# The format below is to be able to see the image from a browser
# yield (b'--frame\r\n'
# b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n')
@app.route('/video-usb')
def video_usb():
return Response(generate_image("usb_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/video-raspi')
def video_raspi():
return Response(generate_image("pi_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/move_forward', methods=['POST'])
def move_forward():
global local_allowed_movement
if local_allowed_movement.value == 'no restriction':
power_a.ChangeDutyCycle(50)
power_b.ChangeDutyCycle(50)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.HIGH)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.HIGH)
return jsonify({"message": "Car moving forward"})
@app.route('/move_backward', methods=['POST'])
def move_backward():
global local_allowed_movement
if local_allowed_movement.value == 'no restriction':
power_a.ChangeDutyCycle(50)
power_b.ChangeDutyCycle(50)
GPIO.output(in1,GPIO.HIGH)
GPIO.output(in2,GPIO.LOW)
GPIO.output(in3,GPIO.HIGH)
GPIO.output(in4,GPIO.LOW)
return jsonify({"message": "Car moving backwards"})
@app.route('/move_right', methods=['POST'])
def move_right():
global local_allowed_movement
if local_allowed_movement.value == 'no restriction' or local_allowed_movement.value == 'turn right':
power_a.ChangeDutyCycle(100)
power_b.ChangeDutyCycle(100)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.HIGH)
GPIO.output(in3,GPIO.HIGH)
GPIO.output(in4,GPIO.LOW)
return jsonify({"message": "Car turning right"})
@app.route('/move_left', methods=['POST'])
def move_left():
global local_allowed_movement
if local_allowed_movement.value == 'no restriction' or local_allowed_movement.value == 'turn left':
power_a.ChangeDutyCycle(100)
power_b.ChangeDutyCycle(100)
GPIO.output(in1,GPIO.HIGH)
GPIO.output(in2,GPIO.LOW)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.HIGH)
return jsonify({"message": "Car turning left"})
@app.route('/move_stop', methods=['POST'])
def move_stop():
power_a.start(1)
power_b.start(1)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.LOW)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.LOW)
return jsonify({"message": "Car stopped"})
def start_server(shared_allowed_movement):
global local_allowed_movement
local_allowed_movement = shared_allowed_movement
app.run(host='192.168.100.36', port=8000, threaded=True)
def get_image():
camera = PiCamera()
camera.start_preview()
stream = io.BytesIO()
camera.capture(stream, format='jpeg')
stream.seek(0)
image = stream.read()
camera.stop_preview()
camera.close()
image_bytes = np.frombuffer(image, dtype=np.uint8)
image = cv2.imdecode(image_bytes, cv2.IMREAD_COLOR)
return image
def turn_direction(hsv, shared_allowed_movement):
lower_blue = np.array([90, 50, 0], dtype = "uint8") # lower limit of blue color
upper_blue = np.array([120, 255, 255], dtype="uint8") # upper limit of blue color
height, width, depth = hsv.shape
width_cutoff = width // 2
left_side = hsv[:, :width_cutoff]
right_side = hsv[:, width_cutoff:]
mask_1 = cv2.inRange(left_side, lower_blue, upper_blue)
mask_2 = cv2.inRange(right_side, lower_blue, upper_blue)
left_pixel_count = np.sum(mask_1)
right_pixel_count = np.sum(mask_2)
if left_pixel_count > right_pixel_count:
shared_allowed_movement.value = 'turn right'
print('\nTURN RIGHT!!!!!!!!!!\n')
else:
shared_allowed_movement.value = 'turn left'
print('\nTURN LEFT!!!!!!!!!!\n')
def detect_boundary(hsv, shared_allowed_movement):
global allowed_movement
lower_blue = np.array([90, 50, 0], dtype = "uint8") # lower limit of blue color
upper_blue = np.array([120, 255, 255], dtype="uint8") # upper limit of blue color
mask = cv2.inRange(hsv, lower_blue, upper_blue)
edge_pixels = np.sum(mask)
if edge_pixels > 350:
turn_direction(hsv, shared_allowed_movement)
else:
shared_allowed_movement.value = 'no restriction'
def boundary_detection(shared_allowed_movement):
while (True):
image = get_image()
print("detecting if boundary has been reached")
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
detect_boundary(hsv, shared_allowed_movement)
if __name__ == '__main__':
manager = multiprocessing.Manager()
shared_allowed_movement = manager.Value("c", "no restriction")
server_process = multiprocessing.Process(target=start_server, args=(shared_allowed_movement,))
boundary_detection_process = multiprocessing.Process(target=boundary_detection, args=(shared_allowed_movement,))
server_process.start()
boundary_detection_process.start()
server_process.join()
boundary_detection_process.join()
我尝试注释boundary_detection_process语句以仅运行电机控制,但电机行为保持完全相同。如果我再次运行旧脚本,它仍然按预期运行(电机获得预期功率)。
如果有人有任何想法我可以尝试,我将非常感激。
我有完全相同的问题,很想听听您是否能解决它?