我遇到一个问题,我试图在后台触发 Python 脚本 最多可插入四个 USB 设备,所有设备均具有相同的 USB 供应商和产品 ID。
为了实现此目的,我在
/etc/udev/rules.d/myrule.rules
中创建了一条 udev 规则,如下所示:
ACTION=="add", SUBSYSTEM=="usb", ATTR{idVendor}=="1234", ATTR{idProduct}=="5678", ENV{SYSTEMD_WANTS}="usb-trigger.service"
1234 和 5689 只是示例。
此外,我在
/etc/systemd/system/usb-trigger.service
中设置了一个 systemd 服务,配置如下:
[Unit]
Description=USB Trigger Service
[Service]
Type=oneshot
ExecStart=/path/to/my/python/script/uart.py
[Install]
WantedBy=multi-user.target
但是,脚本有时执行,有时不执行。此外,如果我插入多个第一个设备或第二个设备,后台脚本将无法启动。有时,所有设备都无法按预期工作。
我的 Python 脚本的主要目标是在插入 USB 设备时生成 CSV 文件,保存数据,并在拔下设备时关闭文件。我需要这个脚本来处理最多四个设备。
这是我正在使用的Python脚本:
python
import serial
import serial.tools.list_ports
import time
import csv
import datetime
import sys
def get_current_time():
current_time = time.time()
milliseconds = int((current_time - int(current_time)) * 1000)
formatted_time = time.strftime("%H:%M:%S", time.localtime(current_time))
formatted_time += f".{milliseconds:03d}" # Adding milliseconds
return formatted_time
def write_to_csv(writer, data):
try:
writer.writerow(data)
except Exception as e:
print(f"Error writing to CSV: {e}")
def find_com_port():
try:
com_ports = serial.tools.list_ports.comports()
for port in com_ports:
if port.device.startswith('/dev/ttyUSB') or port.device.startswith('/dev/ttyACM'):
return port.device
except Exception as e:
print(f"Error finding COM port: {e}")
def main():
try:
while True:
com_port = find_com_port()
if com_port is not None:
print(f"COM port found: {com_port}")
break # Exit the loop once a COM port is found
else:
print("No COM port found. Retrying...")
time.sleep(1)
if com_port is None:
print("No COM port found.")
return
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".csv"
bottle_id = None
bottle_id_collected = False # Flag to track if bottle ID has been collected
with serial.Serial(port=com_port, baudrate=460800, timeout=2) as ser, open(filename, 'w', newline='') as file:
print(f"Connected to {com_port} successfully.")
writer = csv.writer(file)
while True:
received_bytes = ser.readline().decode('ascii', errors='replace').strip()
if received_bytes:
if not bottle_id_collected and 'Bottle ID' in received_bytes:
bottle_id = received_bytes.split('Bottle ID')[1].strip()
writer.writerow(['Bottle ID', bottle_id])
writer.writerow(['Input 1', 'Input 2', 'Input 3', 'time stamp'])
bottle_id_collected = True
else:
parts = received_bytes.split(', ')
try:
numbers = [float(part) for part in parts]
data_row = numbers + [get_current_time()]
writer.writerow(data_row)
print(f"Writing to CSV: {data_row}")
file.flush() # Force flushing
except ValueError as e:
pass
time.sleep(0.01)
except serial.SerialException as e:
print(f"Error opening serial port {com_port}: {e}")
except KeyboardInterrupt:
print("Program terminated by user.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
try:
if 'ser' in locals() and ser.is_open:
ser.close()
except Exception as e:
print(f"Error closing serial port: {e}")
try:
if 'file' in locals() and not file.closed:
file.close()
except Exception as e:
print(f"Error closing CSV file: {e}")
sys.exit()
if __name__ == "__main__":
main()
任何关于脚本行为不一致的原因以及我如何确保它在插入多个 USB 设备时可靠运行的见解将不胜感激。谢谢!
我配置了 udev 规则和 systemd 服务,以便在插入具有相同 ID 的多个 USB 设备时触发 Python 脚本。我希望该脚本能够可靠地为每个设备生成 CSV 文件,同时处理最多四个设备。但是,脚本有时无法启动,尤其是插入多个设备时,会导致行为不一致。
如果您想要多个实例,您需要显式地将您的.service 转换为多实例(模板化)单元。否则,如果一个实例仍在运行,第二次启动将不会执行任何操作。这适用于所有服务类型 – 甚至适用于 Type=oneshot 服务。
将您的单位重命名为
[email protected]
。
将 udev 规则更改为
ENV{SYSTEMD_WANTS}="usb-trigger@%k.service"
。
更改 ExecStart 以将
%i
作为参数传递给脚本。
ExecStart=/path/to/my/python/script/uart.py %i
更改脚本以从
sys.argv
读取设备名称(最好使用 argparse
)。
parser = argparse.ArgumentParser()
parser.add_argument("device")
args = parser.parse_args()
print(f"I was called for {device!r}", file=sys.stderr) # ¹
从脚本中删除现在不需要的“find_com_port()”代码。
¹ (就此而言,您确实应该使用 stderr 来显示错误消息;您应该将 stdout 配置为行缓冲,否则消息可能会被缓冲并且不会立即显示在日志中;理想情况下,您应该通过以下方式使用 syslog
syslog.syslog()
而不是所有这些。)