我正在尝试使用连接到 Pico W 的 Adafruit IO 作为某些 Neopixel 的遥控器。读取该值大约需要 0.25 秒,这会破坏需要延迟的 LED 模式。不使用 time.sleep 来延迟不会改变任何东西 - 因为无论如何读取 feed 都会停止代码。我尝试过在循环中、在函数中、在带有 asyncio 的异步函数中读取它,但所有这些都没有解决问题 - 它们没有改变任何东西。我可能做错了 asyncio 部分,但我不知道也找不到好的例子。
有谁知道如何让阅读发生得更快,或者在后台发生。
这是我与 asyncio 一起使用的代码:
import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
os.getenv('CIRCUITPY_WIFI_PASSWORD'))
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)
print("connected to io")
statefeed = io.get_feed(<insert feed here>)
async def statefunc():
global state
state = io.receive_data(statefeed["key"])["value"] # This is the part that takes 0.25 seconds.
async def main():
asyncstate = asyncio.create_task(statefunc())
await asyncio.gather(asyncstate)
while True:
asyncio.run(main())
if state == "0":
# do a thing
if state == "1":
# do another thing```
为了解决这个问题,您可以使用不同的方法而不依赖 asyncio,因为微控制器不支持真正的并发线程。
这里有一种方法,使用基于计时器的检查来降低轮询 Adafruit IO 的频率,从而使您的 LED 模式顺利运行:
使用计时器进行轮询:您可以设置一个计时器,而不是在每次循环迭代中不断轮询 Adafruit IO。当计时器到期时,您轮询 Adafruit IO,更新状态并重置计时器。
以下是构建代码的方法:
import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
import time
# Connect to WiFi
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
os.getenv('CIRCUITPY_WIFI_PASSWORD'))
# Adafruit IO credentials
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')
# Set up socket pool and HTTP session for Adafruit IO
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)
print("Connected to Adafruit IO")
# Fetch the feed
statefeed = io.get_feed(<insert feed here>)
# Polling configuration
POLL_INTERVAL = 5 # Poll every 5 seconds
last_poll_time = time.monotonic()
state = None # Initial state
def update_state():
"""Update the state by polling Adafruit IO."""
global state
try:
state = io.receive_data(statefeed["key"])["value"]
except AdafruitIO_RequestError:
print("Error fetching state")
state = None
while True:
current_time = time.monotonic()
# If the poll interval has passed, update the state
if current_time - last_poll_time >= POLL_INTERVAL:
update_state()
last_poll_time = current_time
# Check the state and perform the corresponding action
if state == "0":
# do a thing (e.g., light up LEDs in a certain pattern)
pass
elif state == "1":
# do another thing (e.g., light up LEDs in a different pattern)
pass
# Run your LED patterns or other non-blocking code here
# ...
将 “在此处插入 feed” 替换为适当的 Feed 名称或 ID。此代码将每 5 秒轮询 Adafruit IO 以更新状态。在这些轮询间隔之间,您可以执行 LED 模式或其他非阻塞任务,而不会被 IO 获取操作中断。根据需要调整 POLL_INTERVAL。