我最近开始深入研究 Python 的异步代码,并且想知道为什么
asyncio.sleep
如此重要。
x
毫秒从麦克风发出的同步数据源。我的理想实现是,一旦消息准备好就发送,一旦收到消息就立即处理。
这必须是高效的,因为我们想要降低到
x = 20ms
(每 20 毫秒从麦克风接收到帧)。
代码如下:
import asyncio
import msgpack
import os
import pyaudio
import ssl
import websockets
from threading import Thread
from queue import Queue
from dotenv import load_dotenv
# some utilities
from src.utils.constants import CHANNELS, CHUNK, FORMAT, RATE
from .utils import websocket_data_packet
load_dotenv()
QUEUE_MAX_SIZE = 10
MY_URL = os.environ.get("WEBSOCKETS_URL")
ssl_context = ssl.SSLContext()
class MicrophoneStreamer(object):
"""This handles the microphone and yields chunks of data when they are ready."""
chunk: int = CHUNK
channels: int = CHANNELS
format: int = FORMAT
rate: int = RATE
def __init__(self):
self._pyaudio = pyaudio.PyAudio()
self.is_stream_open: bool = True
self.stream = self._pyaudio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk,
)
def __iter__(self):
while self.is_stream_open:
yield self.stream.read(self.chunk)
def close(self):
self.is_stream_open = False
self.stream.close()
self._pyaudio.terminate()
async def consumer(websocket):
async for message in websocket:
print(f"Received message: {msgpack.unpackb(message)}")
async def producer(websocket, audio_queue):
while True:
print("Sending chunck")
chunck = audio_queue.get()
await websocket.send(msgpack.packb(websocket_data_packet(chunck)))
# THE FOLLOWING LINE IS IMPORTANT
await asyncio.sleep(0.02)
async def handler(audio_queue):
websocket = await websockets.connect(MY_URL, ssl=ssl_context)
async with websockets.connect(MY_URL, ssl=ssl_context) as websocket:
print("Websocket opened")
consumer_task = asyncio.create_task(consumer(websocket))
producer_task = asyncio.create_task(producer(websocket, audio_queue))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
timeout=60,
)
for task in pending:
task.cancel()
# TODO: is the following useful?
await websocket.close()
def run(audio_queue: Queue):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(handler(audio_queue))
loop.close()
def main():
audio_queue = Queue(maxsize=5)
# the iterator is synchronous
for i, chunk in enumerate(MicrophoneStreamer()):
print("Iteration", i)
# to simulate condition wakeword detected
if i == 2:
thread = Thread(
target=run,
args=(audio_queue,),
)
thread.start()
# adds to queue
if audio_queue.full():
_ = audio_queue.get_nowait()
audio_queue.put_nowait(chunk)
if __name__ == "__main__":
main()
有一句话我在制作人里评论了
# THE FOLLOWING LINE IS IMPORTANT
。
如果我不在生产者中添加
asyncio.sleep(...)
,则永远不会收到来自消费者的消息。
当我在生产者中添加
asyncio.sleep(0)
时,会收到来自消费者的消息,但很晚且零星地收到。
当我在生产者中添加
asyncio.sleep(0.02)
时,来自消费者的消息会按时收到。
为什么会出现这种现象以及如何解决?为了每 20 毫秒发送一次消息,我不能每次迭代睡眠 20 毫秒,这可能会搞乱整个过程。
(注意,我发现了这个睡眠修复这个问题)
我认为如果迭代器是异步的,这就能解决问题,但事实并非如此。如果你想查看实现,我在过去几天开了另一个帖子这里。
我还尝试更深入地了解事件循环的工作原理。根据我的理解,
asyncio.sleep
对于事件循环决定执行哪个任务以及在它们之间切换是必要的 - 例如,我们在创建任务后使用它来触发任务启动。
这对我来说似乎有点奇怪。有解决办法吗?
这行在异步代码中是不正确的:
chunck = audio_queue.get()
-> 它将阻塞,直到队列中有一个值要读取,并且当它阻塞时,没有其他异步任务运行 - 它应该检查是否可以从中读取某些内容队列,如果没有,将代码释放到 asyncio 循环(首先,不需要等待 20ms,只需 asyncio.sleep(0) 就足以让事情继续进行)
from queue import Queue, Empty
...
async def producer(websocket, audio_queue):
while True:
print("Sending chunck")
try:
chunck = audio_queue.get_nowait()
except Empty:
await asyncio.sleep(0)
continue
await websocket.send(msgpack.packb(websocket_data_packet(chunck)))
# THE FOLLOWING LINE IS IMPORTANT
await asyncio.sleep(0)
这样,您将获得更多对 asyncio 循环的调用,以便其他任务可以运行,并且您也可以在函数的最后一行中使用值“0”。
您必须记住的是,异步编程实现了协作并发执行,并且代码将仅在当前任务之外的“空间”中执行,其中当前代码将控制权显式传递给事件循环。在您的原始实现中,事件循环只能在到达此
websocket.send
行时逐步执行由 asyncio.sleep
任务安排的任何任务 - 否则,它将在下一次迭代中运行到 audio_queue.get()
,并阻止所有内容 - 包括任何后台 I/O 回调。通过将 get
变为非阻塞并插入额外的 await asyncio.sleep(0)
(是的,这是当您不需要等待任何内容时将控制权传递给异步循环的官方方法),它将运行 I/ O 在其他任务中,因为它等待线程队列中出现某些内容。
对于任何想知道的人,您可以前往这个精彩的讨论。我花了(太)长时间寻找它