在使用需要很长时间的函数时阻止代码停止

问题描述 投票:0回答:1

我正在尝试使用连接到 Pico W 的 Adafruit IO 作为某些 Neopixel 的遥控器。读取该值大约需要 0.25 秒,这会破坏需要延迟的 LED 模式。不使用 time.sleep 来延迟不会改变任何东西 - 因为无论如何读取 feed 都会停止代码。我尝试过在循环中、在函数中、在带有 asyncio 的异步函数中读取它,但所有这些都没有解决问题 - 它们没有改变任何东西。我可能做错了 asyncio 部分,但我不知道也找不到好的例子。

有谁知道如何让阅读发生得更快,或者在后台发生。

这是我与 asyncio 一起使用的代码:

import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError

wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
                   os.getenv('CIRCUITPY_WIFI_PASSWORD'))
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')

pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)
print("connected to io")

statefeed = io.get_feed(<insert feed here>)

async def statefunc():
    global state
    state = io.receive_data(statefeed["key"])["value"] # This is the part that takes 0.25 seconds.
    
async def main():
    asyncstate = asyncio.create_task(statefunc())
    await asyncio.gather(asyncstate)

while True:
    asyncio.run(main())

    if state == "0":
        # do a thing

    if state == "1":
       # do another thing```
python multithreading asynchronous iot adafruit
1个回答
0
投票

为了解决这个问题,您可以使用不同的方法而不依赖 asyncio,因为微控制器不支持真正的并发线程。

这里有一种方法,使用基于计时器的检查来降低轮询 Adafruit IO 的频率,从而使您的 LED 模式顺利运行:

使用计时器进行轮询:您可以设置一个计时器,而不是在每次循环迭代中不断轮询 Adafruit IO。当计时器到期时,您轮询 Adafruit IO,更新状态并重置计时器。

以下是构建代码的方法:

import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
import time

# Connect to WiFi
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'),
                   os.getenv('CIRCUITPY_WIFI_PASSWORD'))

# Adafruit IO credentials
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')

# Set up socket pool and HTTP session for Adafruit IO
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
io = IO_HTTP(aio_username, aio_key, requests)

print("Connected to Adafruit IO")

# Fetch the feed
statefeed = io.get_feed(<insert feed here>)

# Polling configuration
POLL_INTERVAL = 5  # Poll every 5 seconds
last_poll_time = time.monotonic()
state = None  # Initial state

def update_state():
    """Update the state by polling Adafruit IO."""
    global state
    try:
        state = io.receive_data(statefeed["key"])["value"]
    except AdafruitIO_RequestError:
        print("Error fetching state")
        state = None

while True:
    current_time = time.monotonic()
    
    # If the poll interval has passed, update the state
    if current_time - last_poll_time >= POLL_INTERVAL:
        update_state()
        last_poll_time = current_time

    # Check the state and perform the corresponding action
    if state == "0":
        # do a thing (e.g., light up LEDs in a certain pattern)
        pass
    elif state == "1":
        # do another thing (e.g., light up LEDs in a different pattern)
        pass

    # Run your LED patterns or other non-blocking code here
    # ...

“在此处插入 feed” 替换为适当的 Feed 名称或 ID。此代码将每 5 秒轮询 Adafruit IO 以更新状态。在这些轮询间隔之间,您可以执行 LED 模式或其他非阻塞任务,而不会被 IO 获取操作中断。根据需要调整 POLL_INTERVAL。

© www.soinside.com 2019 - 2024. All rights reserved.