Raspberry Pi 4B 的多处理和电机功率问题

问题描述 投票:0回答:1

我正在尝试使用 Python 中的多处理在 Raspberry Pi 中运行 Flask 服务器。使用多重处理的原因是我需要(同时)不断检查是否已达到边界(使用 opencv 和相机)以及从 PC 控制一辆小型汽车。

我用于控制电机的代码使用了线程并且按预期工作。然而,更新版本使用了多重处理,并且电机的行为就好像它们没有获得足够的动力来移动汽车一样。

原代码为:


import cv2
from picamera import PiCamera
import io
import threading
from flask import Flask, Response, jsonify
import RPi.GPIO as GPIO          
from time import sleep


in1 = 23
in2 = 24
en_a = 25
in3 = 17
in4 = 27
en_b = 22
temp1 = 1

GPIO.setmode(GPIO.BCM)

GPIO.setup(in1,GPIO.OUT)
GPIO.setup(in2,GPIO.OUT)
GPIO.setup(en_a,GPIO.OUT)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.LOW)

GPIO.setup(in3,GPIO.OUT)
GPIO.setup(in4,GPIO.OUT)
GPIO.setup(en_b,GPIO.OUT)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.LOW)

power_a = GPIO.PWM(en_a,1000)
power_b = GPIO.PWM(en_b,1000)

power_a.start(1)
power_b.start(1)


app = Flask(__name__)


def capture_frame(camera):
    success, frame = camera.read()
    if not success:
        print("failed to read frame")
        return None
    else:
        ret, buffer = cv2.imencode('.jpg', frame)
        if not ret:
            return None
        frame_bytes = buffer.tobytes()
        return frame_bytes
    
    
def generate_image(camera):
    if camera == "usb_camera":
        camera = cv2.VideoCapture(1)
        image = capture_frame(camera)
        camera.release()
    elif camera == "pi_camera":
        camera = PiCamera()
        camera.start_preview()
        stream = io.BytesIO()
        camera.capture(stream, format='jpeg')
        stream.seek(0)
        image = stream.read()
        camera.stop_preview()
        camera.close()
    if image:
        yield (image)
        # The format below is to be able to see the image from a browser
        # yield (b'--frame\r\n'
        #        b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n')
            

@app.route('/video-usb')
def video_usb():
    return Response(generate_image("usb_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route('/video-raspi')
def video_raspi():
    return Response(generate_image("pi_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')
    
    
@app.route('/move_forward', methods=['POST'])
def move_forward():
    power_a.ChangeDutyCycle(50)
    power_b.ChangeDutyCycle(50)
    GPIO.output(in1,GPIO.LOW)
    GPIO.output(in2,GPIO.HIGH)
    GPIO.output(in3,GPIO.LOW)
    GPIO.output(in4,GPIO.HIGH)
    return jsonify({"message": "Car moving forward"})
    
    
@app.route('/move_backward', methods=['POST'])
def move_backward():
    power_a.ChangeDutyCycle(50)
    power_b.ChangeDutyCycle(50)
    GPIO.output(in1,GPIO.HIGH)
    GPIO.output(in2,GPIO.LOW)
    GPIO.output(in3,GPIO.HIGH)
    GPIO.output(in4,GPIO.LOW)
    return jsonify({"message": "Car moving backwards"})
    
    
@app.route('/move_right', methods=['POST'])
def move_right():
    power_a.ChangeDutyCycle(100)
    power_b.ChangeDutyCycle(100)
    GPIO.output(in1,GPIO.LOW)
    GPIO.output(in2,GPIO.HIGH)
    GPIO.output(in3,GPIO.HIGH)
    GPIO.output(in4,GPIO.LOW)
    return jsonify({"message": "Car turning right"})
    
    
@app.route('/move_left', methods=['POST'])
def move_left():
    power_a.ChangeDutyCycle(100)
    power_b.ChangeDutyCycle(100)
    GPIO.output(in1,GPIO.HIGH)
    GPIO.output(in2,GPIO.LOW)
    GPIO.output(in3,GPIO.LOW)
    GPIO.output(in4,GPIO.HIGH)
    return jsonify({"message": "Car turning left"})
    

@app.route('/move_stop', methods=['POST'])
def move_stop():
    power_a.start(1)
    power_b.start(1)
    GPIO.output(in1,GPIO.LOW)
    GPIO.output(in2,GPIO.LOW)
    GPIO.output(in3,GPIO.LOW)
    GPIO.output(in4,GPIO.LOW)
    return jsonify({"message": "Car stopped"})


def start_server():
    app.run(host='192.168.100.36', port=8000, threaded=True)


if __name__ == '__main__':
    server_thread = threading.Thread(target=start_server)
    server_thread.daemon = True
    server_thread.start()
    server_thread.join()

修改后的版本使用多处理,我可以看到树莓派正在获取信号来启用电机(灯亮了,电机发出噪音),但就好像电机由于某种原因以非常低的功率工作。代码是:

import cv2
from picamera import PiCamera
import io
from flask import Flask, Response, jsonify
import RPi.GPIO as GPIO          
from time import sleep

import multiprocessing
import numpy as np


in1 = 23
in2 = 24
en_a = 25
in3 = 17
in4 = 27
en_b = 22
temp1 = 1

GPIO.setmode(GPIO.BCM)

GPIO.setup(in1,GPIO.OUT)
GPIO.setup(in2,GPIO.OUT)
GPIO.setup(en_a,GPIO.OUT)
GPIO.output(in1,GPIO.LOW)
GPIO.output(in2,GPIO.LOW)

GPIO.setup(in3,GPIO.OUT)
GPIO.setup(in4,GPIO.OUT)
GPIO.setup(en_b,GPIO.OUT)
GPIO.output(in3,GPIO.LOW)
GPIO.output(in4,GPIO.LOW)

power_a = GPIO.PWM(en_a,1000)
power_b = GPIO.PWM(en_b,1000)

power_a.start(1)
power_b.start(1)


app = Flask(__name__)


def capture_frame(camera):
    success, frame = camera.read()
    if not success:
        print("failed to read frame")
        return None
    else:
        ret, buffer = cv2.imencode('.jpg', frame)
        if not ret:
            return None
        frame_bytes = buffer.tobytes()
        return frame_bytes
    return None
    
    
def generate_image(camera):
    if camera == "usb_camera":
        camera = cv2.VideoCapture(1)
        image = capture_frame(camera)
        camera.release()
    elif camera == "pi_camera":
        camera = PiCamera()
        camera.start_preview()
        stream = io.BytesIO()
        camera.capture(stream, format='jpeg')
        stream.seek(0)
        image = stream.read()
        camera.stop_preview()
        camera.close()
    if image:
        yield (image)
        # The format below is to be able to see the image from a browser
        # yield (b'--frame\r\n'
        #        b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n')
            

@app.route('/video-usb')
def video_usb():
    return Response(generate_image("usb_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route('/video-raspi')
def video_raspi():
    return Response(generate_image("pi_camera"), mimetype='multipart/x-mixed-replace; boundary=frame')
    
    
@app.route('/move_forward', methods=['POST'])
def move_forward():
    global local_allowed_movement
    
    if local_allowed_movement.value == 'no restriction':
        power_a.ChangeDutyCycle(50)
        power_b.ChangeDutyCycle(50)
        GPIO.output(in1,GPIO.LOW)
        GPIO.output(in2,GPIO.HIGH)
        GPIO.output(in3,GPIO.LOW)
        GPIO.output(in4,GPIO.HIGH)
        return jsonify({"message": "Car moving forward"})
    
    
@app.route('/move_backward', methods=['POST'])
def move_backward():
    global local_allowed_movement
    
    if local_allowed_movement.value == 'no restriction':
        power_a.ChangeDutyCycle(50)
        power_b.ChangeDutyCycle(50)
        GPIO.output(in1,GPIO.HIGH)
        GPIO.output(in2,GPIO.LOW)
        GPIO.output(in3,GPIO.HIGH)
        GPIO.output(in4,GPIO.LOW)
        return jsonify({"message": "Car moving backwards"})
    
    
@app.route('/move_right', methods=['POST'])
def move_right():
    global local_allowed_movement
    
    if local_allowed_movement.value == 'no restriction' or local_allowed_movement.value == 'turn right':
        power_a.ChangeDutyCycle(100)
        power_b.ChangeDutyCycle(100)
        GPIO.output(in1,GPIO.LOW)
        GPIO.output(in2,GPIO.HIGH)
        GPIO.output(in3,GPIO.HIGH)
        GPIO.output(in4,GPIO.LOW)
        return jsonify({"message": "Car turning right"})
    
    
@app.route('/move_left', methods=['POST'])
def move_left():
    global local_allowed_movement
    
    if local_allowed_movement.value == 'no restriction' or local_allowed_movement.value == 'turn left':
        power_a.ChangeDutyCycle(100)
        power_b.ChangeDutyCycle(100)
        GPIO.output(in1,GPIO.HIGH)
        GPIO.output(in2,GPIO.LOW)
        GPIO.output(in3,GPIO.LOW)
        GPIO.output(in4,GPIO.HIGH)
        return jsonify({"message": "Car turning left"})
    

@app.route('/move_stop', methods=['POST'])
def move_stop():
    power_a.start(1)
    power_b.start(1)
    GPIO.output(in1,GPIO.LOW)
    GPIO.output(in2,GPIO.LOW)
    GPIO.output(in3,GPIO.LOW)
    GPIO.output(in4,GPIO.LOW)
    return jsonify({"message": "Car stopped"})


def start_server(shared_allowed_movement):
    global local_allowed_movement
    local_allowed_movement = shared_allowed_movement
    
    app.run(host='192.168.100.36', port=8000, threaded=True)


def get_image():
    camera = PiCamera()
    camera.start_preview()
    stream = io.BytesIO()
    camera.capture(stream, format='jpeg')
    stream.seek(0)
    image = stream.read()
    camera.stop_preview()
    camera.close()
    image_bytes = np.frombuffer(image, dtype=np.uint8)
    image = cv2.imdecode(image_bytes, cv2.IMREAD_COLOR)
    return image

    
def turn_direction(hsv, shared_allowed_movement):
    lower_blue = np.array([90, 50, 0], dtype = "uint8") # lower limit of blue color
    upper_blue = np.array([120, 255, 255], dtype="uint8") # upper limit of blue color
    height, width, depth = hsv.shape
    width_cutoff = width // 2
    left_side = hsv[:, :width_cutoff]    
    right_side = hsv[:, width_cutoff:]
    mask_1 = cv2.inRange(left_side, lower_blue, upper_blue)
    mask_2 = cv2.inRange(right_side, lower_blue, upper_blue)
    left_pixel_count = np.sum(mask_1)
    right_pixel_count = np.sum(mask_2)
    if left_pixel_count > right_pixel_count:
        shared_allowed_movement.value = 'turn right'
        print('\nTURN RIGHT!!!!!!!!!!\n')
    else:
        shared_allowed_movement.value = 'turn left'
        print('\nTURN LEFT!!!!!!!!!!\n')


def detect_boundary(hsv, shared_allowed_movement):
    global allowed_movement

    lower_blue = np.array([90, 50, 0], dtype = "uint8") # lower limit of blue color
    upper_blue = np.array([120, 255, 255], dtype="uint8") # upper limit of blue color
    mask = cv2.inRange(hsv, lower_blue, upper_blue)
    edge_pixels = np.sum(mask)
    if edge_pixels > 350:
        turn_direction(hsv, shared_allowed_movement)
    else:
        shared_allowed_movement.value = 'no restriction'


def boundary_detection(shared_allowed_movement):   
    while (True):
        image = get_image() 
        print("detecting if boundary has been reached")
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        detect_boundary(hsv, shared_allowed_movement)


if __name__ == '__main__':    
    manager = multiprocessing.Manager()
    shared_allowed_movement = manager.Value("c", "no restriction")
    
    server_process = multiprocessing.Process(target=start_server, args=(shared_allowed_movement,))
    boundary_detection_process = multiprocessing.Process(target=boundary_detection, args=(shared_allowed_movement,))
    
    server_process.start()
    boundary_detection_process.start()
    
    server_process.join()
    boundary_detection_process.join()

我尝试注释boundary_detection_process语句以仅运行电机控制,但电机行为保持完全相同。如果我再次运行旧脚本,它仍然按预期运行(电机获得预期功率)。

如果有人有任何想法我可以尝试,我将非常感激。

python-3.x python-multiprocessing raspberry-pi4
1个回答
0
投票

我有完全相同的问题,很想听听您是否能解决它?

© www.soinside.com 2019 - 2024. All rights reserved.