我在使用 asyncio 安排工作时遇到问题。我有这样的代码:
import asyncio
async def stream():
char_string = "Hi. Hello. Thank you."
for char in char_string:
await asyncio.sleep(0.1) # something time consuming happening here
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence)*0.1)
print("sentence processed!")
async def main():
i=0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence)
i += 1
asyncio.run(main())
这是我的输出:
got char: H
got char: i
got char: .
got sentence: Hi.
processing sentence: 0
waiting for processing sentence: Hi.
sentence processed!
got char:
got char: H
got char: e
got char: y
got char: .
got sentence: Hey.
processing sentence: 1
waiting for processing sentence: Hey.
sentence processed!
got char:
got char: H
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence: Hello.
processing sentence: 2
waiting for processing sentence: Hello.
sentence processed!
这不是最佳选择。当
process_sentence
正在等待 asyncio.sleep()
(代表其他一些耗时的过程)时,它应该已经从流中获取下一个字符。所以,我期望这样的输出:
got char: H
got char: i
got char: .
got sentence: Hi.
processing sentence: 0
waiting for processing sentence: Hi.
got char: # (space char)
got char: H
sentence processed!
got char: e
got char: y
got char: .
got sentence: Hey.
processing sentence: 1
waiting for processing sentence: Hey.
got char # (space char)
got char H
got char: e
sentence processed!
got char: l
got char: l
got char: o
got char: .
got sentence: Hello.
processing sentence: 2
waiting for processing sentence: Hello.
sentence processed!
我怎样才能实现它?
您可以通过实现
Queue
来实现所需的输出,您需要将其传入两个函数 stream()
和 sentences_generator()
。
注意:您的
char_string
不应该是"Hi. Hello. Thank you."
,而应该是"Hi. Hey. Hello."
。
下面的代码将给出您所需的输出,即在
process_sentence()
等待时处理其他任务:
import asyncio
async def stream(queue):
char_string = "Hi. Hey. Hello."
for char in char_string:
await asyncio.sleep(0.1) # something time-consuming happening here
print("got char:", char)
await queue.put(char)
# Signal the end of the stream
await queue.put(None)
async def sentences_generator(queue):
sentence = ""
while True:
char = await queue.get()
if char is None:
break
sentence += char
if char in [".", "!", "?"]:
print("got sentence:", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence:", sentence)
await asyncio.sleep(len(sentence) * 0.1)
print("sentence processed!")
async def main():
q = asyncio.Queue()
# Run the stream coroutine concurrently
stream_task = asyncio.create_task(stream(q))
counter = 0
async for sentence in sentences_generator(q):
print("processing sentence:", counter)
counter += 1
sentence_task = asyncio.create_task(process_sentence(sentence))
await sentence_task
# Wait for the stream to finish
await stream_task
# Run the coroutine
asyncio.run(main())
运行后将生成以下输出:
got char: H
got char: i
got char: .
got sentence: Hi.
processing sentence: 0
waiting for processing sentence: Hi.
got char: # <empty char>
got char: H
sentence processed!
got char: e
got char: y
got char: .
got sentence: Hey.
processing sentence: 1
waiting for processing sentence: Hey.
got char: # <empty char>
got char: H
got char: e
got char: l
sentence processed!
got char: l
got char: o
got char: .
got sentence: Hello.
processing sentence: 2
waiting for processing sentence: Hello.
sentence processed!