我正在开发一个基于 Raspberry Pi 的设备,通过 OBD 插头与汽车进行通信。我在 RPi 4 上有一个 CAN 总线帽子 (https://www.waveshare.com/wiki/2-CH_CAN_HAT)。
但我似乎无法让过滤器工作。
我可以通过在帽子提供的演示中添加行来设置工作过滤器:
import os
import can
import time
os.system('sudo ip link set can0 type can bitrate 500000')
os.system('sudo ifconfig can0 up')
can0 = can.interface.Bus(channel='can0', bustype='socketcan') # socketcan_native
can0.set_filters([{"can_id" : 0x688, "can_mask" : 0xFFF, "extended" : False}])
timeout = 999.0
start_time = time.time()
while time.time() - start_time < timeout:
msg = can0.recv(0.1)
if msg is not None:
print("Received message on can0:", msg)
else:
print('Timeout occurred, no message.')
can0.shutdown()
os.system('sudo ifconfig can0 down')
但是当我尝试在代码中以相同的方式设置过滤器时,它不起作用,就好像根本没有设置过滤器并且所有 ID 都能够通过,我的代码有点长,所以我将发布仅相关位。
CAN总线设置:
#configuration des parametres du CAN0 sur le terminal à distance
os.system('sudo ip link set can0 type can bitrate 500000')
os.system('sudo ifconfig can0 txqueuelen 65536')
os.system('sudo ifconfig can0 up')
#configuration des parametres du CAN1 (can bsi) sur le terminal à distance
os.system('sudo ip link set can1 type can bitrate 500000')
os.system('sudo ifconfig can1 txqueuelen 65536')
os.system('sudo ifconfig can1 up')
#affichge de l'etat des can
os.system('dmesg | grep spi')
#configuration de l'interface du can0
can0 = can.interface.Bus(channel = 'can0', bustype = 'socketcan')# socketcan_native
print("can0 initialised \n")
#configuration de l'interface du can1
can1 = can.interface.Bus(channel = 'can1', bustype = 'socketcan')# socketcan_native
print("can1 initialised \n")
过滤器设置(代码下方):
#selection du bus de communication
if bus == 1 :
canbus = can0
else:
canbus = can1
#paramétrage du filtre
print("avant filtre")
canbus.set_filters([{"can_id" : repID, "can_mask" : 0xFFF, "extended" : False}])
print("apres filtre")
我尝试为两条总线设置过滤器,而不是使用我的变量“canbus”,它并没有改变结果。也许这与动态过滤器设置有关?我大约每秒更换一次过滤器。
这是编辑过滤器的完整子功能,repID 和 reqID 是 CAN 地址,例如 0x650 或 0x742。
def request_info_PSA(reqID, repID, bus):
"""
recherche les références du calculateur à l'adresse spécifiée en entrée'
Parameters
----------
reqID : INT (valeur en hexa) adresse de requete du calculateur
repID : INT (valeur en hexa) adresse de réponse du calculateur
bus : INT numéro du bus a utiliser (1 ou 2)
Returns
-------
refPart : STRING référence du matériel
refPart2 : STRING référence complémentaire du matériel
"""
refComplete = False
refPart = ""
refPart2 = ""
#selection du bus de communication
if bus == 1 :
canbus = can0
else:
canbus = can1
#paramétrage du filtre
print("avant filtre")
canbus.set_filters([{"can_id" : repID, "can_mask" : 0xFFF, "extended" : False}])
print("apres filtre")
startTime = time.time()
trame = 1
while refComplete == False :
# sortie de boucle (timeout)
if (time.time() - startTime > 2.0) :
refPart = refPart2 = "Pas de reponse"
break
# Envoi des différentes trames de requete
if trame == 1 :
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=dem_Acces_1)
time.sleep(0.1)
trame = 2
elif trame == 2 :
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=dem_Acces_2)
time.sleep(0.1)
trame = 3
elif trame == 3 :
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=dem_Acces_3)
time.sleep(0.1)
trame = 1
canbus.send(msg_REQ)
# lecture du buffer
msg_REP = canbus.recv(0.1)
if msg_REP == None :
continue
# Réponse du véhicule selon cas 1 ou 2
if comp_CAN(msg_REP.data,rep_Acces_1) or comp_CAN(msg_REP.data, rep_Acces_2):
canbus.stop_all_periodic_tasks(remove_tasks=True)
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=data_PSA_1)
canbus.send(msg_REQ)
# récupération des références
while refComplete == False :
# lecture du buffer
msg_REP = canbus.recv(timeout=0.5)
if msg_REP == None :
break
# Reception de la premiere trame de réponse
if (msg_REP.data[0]==0x10):
for i in range(0,4):
refPart = refPart + "%02x" % msg_REP.data[i+4]
# Envoi de la demande des trames suivantes
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=data_extend_PSA)
canbus.send(msg_REQ)
# Reception de la seconde trame de réponse
elif msg_REP.data[0]==0x21:
refPart = refPart + "%02x" % msg_REP.data[1]
for i in range(0,4) :
refPart2 = refPart2 + "%02x" % msg_REP.data[i+4]
# Reception de la derniere trame de réponse
elif msg_REP.data[0]==0x22:
refPart2 = refPart2 + "%02x" % msg_REP.data[1]
refComplete = True
# Timeout
elif (time.time() - startTime > 2.0) :
break
# Réponse du véhicule selon cas 3
elif comp_CAN(msg_REP.data,rep_Acces_31) or comp_CAN(msg_REP.data,rep_Acces_32):
canbus.stop_all_periodic_tasks(remove_tasks=True)
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=data_PSA_2)
canbus.send(msg_REQ)
# récupération des références
while refComplete == False :
# lecture du buffer
msg_REP = canbus.recv(timeout=0.5)
if msg_REP == None :
break
# Reception de la premiere trame de réponse
if (msg_REP.data[0]==0x10):
for i in range(0,3):
refPart = refPart + "%02x" % msg_REP.data[i+5]
# Envoi de la demande des trames suivantes
msg_REQ = can.Message(is_extended_id=False, arbitration_id=reqID, data=data_extend_PSA)
canbus.send(msg_REQ)
# Reception de la seconde trame de réponse
elif msg_REP.data[0]==0x21:
for i in range(0,2) :
refPart = refPart + "%02x" % msg_REP.data[i+1]
for j in range (0,3) :
refPart2 = refPart2 + "%02x" % msg_REP.data[j+5]
# Reception de la derniere trame de réponse
elif msg_REP.data[0]==0x22:
for i in range (0,2) :
refPart2 = refPart2 + "%02X" % msg_REP.data[i+1]
refComplete = True
# Timeout
elif (time.time() - startTime > 2.0) :
break
# Sortie de fonction
return refPart, refPart2
有什么想法吗?
我已设法找到问题所在。
调用过滤器时,它们在当前 RX 缓冲区之后应用,因此我收到的那些不需要的消息来自应用过滤器之前。
我只需要弄清楚如何在设置过滤器后刷新 rx 缓冲区,它应该可以解决问题,我会更新。