我有以下测试代码,我试图将不同的协程链接在一起。 我的想法是,我希望有一个协程来下载数据,并且一旦下载数据,我就希望将数据放入第二个例程中,然后由第二个例程处理数据。 每当我跳过 process_data 步骤时,下面的代码都有效,但每当我包含 process_data 步骤(尝试将协程链接在一起)时,它都会失败。我该如何解决它?
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_dummy(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return data
async def process_data(data):
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(process_data(download_dummy(task)))
# res = tg.create_task(download_dummy(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
我得到的错误相当明显,当第一个协程传递给第二个协程时,它似乎并未实际执行,但我不确定如何优雅地解决这个问题。
+ Exception Group Traceback (most recent call last):
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2252, in <module>
| main()
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2234, in main
| globals = debugger.run(setup['file'], None, None, is_module)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1544, in run
| return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1551, in _exec
| pydev_imports.execfile(file, globals, locals) # execute the script
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
| exec(compile(contents+"\n", file, 'exec'), glob, loc)
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 31, in <module>
| asyncio.run(main(task_inputs))
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 194, in run
| return runner.run(main)
| ^^^^^^^^^^^^^^^^
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 118, in run
| return self._loop.run_until_complete(task)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\base_events.py", line 687, in run_until_complete
| return future.result()
| ^^^^^^^^^^^^^^^
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 21, in main
| async with asyncio.TaskGroup() as tg:
| File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\taskgroups.py", line 145, in __aexit__
| raise me from None
| ExceptionGroup: unhandled errors in a TaskGroup (9 sub-exceptions)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 2 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 3 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 4 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 5 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 6 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 7 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 8 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+---------------- 9 ----------------
| Traceback (most recent call last):
| File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
| processed_data = data*2
| ~~~~^~
| TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
+------------------------------------
你有:
...
res = tg.create_task(process_data(download_dummy(task)))
...
process_data
期望传递一个数字,该数字将乘以 2。您将传递给它 download_dummy(task)
,它是一个协程实例。
由于您想模拟从 URL 下载一些数据然后对其进行处理,我相信最清晰的解决方案是将
download_data
重命名为 download_and_process_data
。然后,当它下载数据时,它会调用 process_data
,如下所示:
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_and_process_data(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return await process_data(data)
async def process_data(data):
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(download_and_process_data(task))
# res = tg.create_task(download_dummy(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
打印:
tarted at 05:32:48
downloaded 0
downloaded 1
processed 0
downloaded 2
processed 1
downloaded 3
downloaded 3
processed 2
downloaded 4
processed 3
downloaded 4
downloaded 4
processed 3
downloaded 5
processed 4
processed 4
processed 4
processed 5
finished at 05:32:54
[0, 2, 4, 6, 8, 10, 8, 6, 8]
或者,您可以保留
res = tg.create_task(process_data(download_dummy(task))
目前的样子,但然后定义 process_data
以期望协同例程,如下所示:
import asyncio
import time
task_inputs = [0,1,2,3,4,5,4,3,4]
async def download_dummy(url):
await asyncio.sleep(url)
data = url
print(f'downloaded {url}')
return data
async def process_data(coro):
data = await coro
await asyncio.sleep(1)
processed_data = data*2
print(f"processed {data}")
return processed_data
async def main(task_inputs):
task_handlers = []
print(f"started at {time.strftime('%X')}")
async with asyncio.TaskGroup() as tg:
for task in task_inputs:
res = tg.create_task(process_data(download_dummy(task)))
# res = tg.create_task(download_dummy(task))
task_handlers.append(res)
print(f"finished at {time.strftime('%X')}")
results = [task_handler.result() for task_handler in task_handlers]
print(results)
asyncio.run(main(task_inputs))
在这种情况下,您将传递给
process_data
一个协程,必须等待该协程才能获取将要处理的数据。这可行,但逻辑却很复杂。