我正在使用 MicroPython (v1.22.2) 开发低功耗蓝牙应用程序,其中包含用于外围设备 (Raspberry Pi Pico W) 的 aioble 库和用于在 Raspberry Pi 3 B+ 上运行的中央设备的 Python bluepy 库。我面临的问题是 BLE 连接在几秒钟(10-20 秒)后反复取消。外围设备发送有关传感器状态(打开/关闭)的通知,中央设备接收这些通知。
这是所发生情况的简化版本:
# Bluetooth GATT setup
_DOOR_SERVICE_UUID = bluetooth.UUID('12345678-1234-5678-1234-56789abcdef0')
_DOOR_STATE_UUID = bluetooth.UUID('12345678-1234-5678-1234-56789abcdef1')
_ADV_APPEARANCE_GENERIC_DOOR_SENSOR = const(768)
_ADV_INTERVAL_MS = 250_000
door_service = aioble.Service(_DOOR_SERVICE_UUID)
door_state_characteristic = aioble.Characteristic(door_service, _DOOR_STATE_UUID, read=True, notify=True)
aioble.register_services(door_service)
...
...
# Bluetooth advertising task
async def peripheral_task():
global current_connection
while True:
if not current_connection:
print("No Connection, attempting to advertise")
try:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name="DoorSensor",
services=[_DOOR_SERVICE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_DOOR_SENSOR,
) as connection:
current_connection = connection
print("Connection from", connection.device)
await connection.disconnected()
except asyncio.CancelledError as e:
print(f"Cancelled: {str(e)}")
current_connection = None
except Exception as e:
print(f"Unhandled exception: {str(e)}")
finally:
current_connection = None
else:
if not current_connection.is_connected():
print("lost connection")
print("connected")
await asyncio.sleep(1)
...
# loop which checks door state and notifies central
...
if current_connection:
door_state_characteristic.notify(current_connection, _encode_door_state(1))
await asyncio.sleep(1)
from bluepy.btle import Peripheral, DefaultDelegate
class MyDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
print("Notification from Handle: {}".format(cHandle))
state = int.from_bytes(data, byteorder='little')
print("Door State Changed:", "Open" if state else "Closed")
def connect_to_sensor():
peri = Peripheral("D8:3A:DD:9E:82:B1", "public")
peri.setDelegate(MyDelegate())
print("Connected to device")
# Get service and characteristic
svc = peri.getServiceByUUID("12345678-1234-5678-1234-56789abcdef0")
ch = svc.getCharacteristics("12345678-1234-5678-1234-56789abcdef1")[0]
print("Service and characteristic found")
# Correct CCCD UUID for enabling notifications
cccd_uuid = "00002902-0000-1000-8000-00805f9b34fb"
cccd = ch.getDescriptors(forUUID=cccd_uuid)[0]
cccd.write(b"\x01\x00", True) # Enable notifications
print("Subscribed to notifications")
return peri
p = None
while True:
if p:
try:
if p.waitForNotifications(1.0):
continue
print("Waiting for notifications...")
except Exception as e:
print(f"got some weird error\n{e}")
p = None
elif not p:
p = connect_to_sensor()
p.disconnect()
No Connection, attempting to advertise
Connection from Device(ADDR_PUBLIC, e4:aa:ea:fb:e0:32, CONNECTED)
It's open # notification
It's open # notification
Cancelled:
No Connection, attempting to advertise
Connection from Device(ADDR_PUBLIC, e4:aa:ea:fb:e0:32, CONNECTED)
It's open # notification
Cancelled:
No Connection, attempting to advertise
Connection from Device(ADDR_PUBLIC, e4:aa:ea:fb:e0:32, CONNECTED)
It's open # notification
It's open # notification
使用 bluetoothctl 测试我的机器(Ubuntu 22.04)的连接似乎表明外围设备断开连接,而不是中央设备。
意外断开可能是由于
的默认timeout_ms造成的await connection.disconnected()
在您的外围代码中。
我会用:
await connection.disconnected(timeout_ms=None)