asyncio 如何链接协程

问题描述 投票:0回答:1

我有以下测试代码,我试图将不同的协程链接在一起。 我的想法是,我希望有一个协程来下载数据,并且一旦下载数据,我就希望将数据放入第二个例程中,然后由第二个例程处理数据。 每当我跳过 process_data 步骤时,下面的代码都有效,但每当我包含 process_data 步骤(尝试将协程链接在一起)时,它都会失败。我该如何解决它?

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_dummy(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return data

async def process_data(data):
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(process_data(download_dummy(task)))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

我得到的错误相当明显,当第一个协程传递给第二个协程时,它似乎并未实际执行,但我不确定如何优雅地解决这个问题。

+ Exception Group Traceback (most recent call last):
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2252, in <module>
  |     main()
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 2234, in main
  |     globals = debugger.run(setup['file'], None, None, is_module)
  |               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1544, in run
  |     return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\pydevd.py", line 1551, in _exec
  |     pydev_imports.execfile(file, globals, locals)  # execute the script
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\JetBrains\PyCharm Community Edition 2024.1.4\plugins\python-ce\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
  |     exec(compile(contents+"\n", file, 'exec'), glob, loc)
  |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 31, in <module>
  |     asyncio.run(main(task_inputs))
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 194, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\base_events.py", line 687, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 21, in main
  |     async with asyncio.TaskGroup() as tg:
  |   File "C:\Users\Tue J Boesen\AppData\Local\Programs\Python\Python312-arm64\Lib\asyncio\taskgroups.py", line 145, in __aexit__
  |     raise me from None
  | ExceptionGroup: unhandled errors in a TaskGroup (9 sub-exceptions)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 2 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 3 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 4 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 5 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 6 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 7 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 8 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +---------------- 9 ----------------
    | Traceback (most recent call last):
    |   File "C:\Users\Tue J Boesen\Download\pythonProject\test.py", line 14, in process_data
    |     processed_data = data*2
    |                      ~~~~^~
    | TypeError: unsupported operand type(s) for *: 'coroutine' and 'int'
    +------------------------------------
python python-3.x python-asyncio
1个回答
0
投票

你有:

...
res = tg.create_task(process_data(download_dummy(task)))
...

process_data
期望传递一个数字,该数字将乘以 2。您将传递给它
download_dummy(task)
,它是一个协程实例。

由于您想模拟从 URL 下载一些数据然后对其进行处理,我相信最清晰的解决方案是将

download_data
重命名为
download_and_process_data
。然后,当它下载数据时,它会调用
process_data
,如下所示:

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_and_process_data(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return await process_data(data)

async def process_data(data):
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(download_and_process_data(task))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

打印:

tarted at 05:32:48
downloaded 0
downloaded 1
processed 0
downloaded 2
processed 1
downloaded 3
downloaded 3
processed 2
downloaded 4
processed 3
downloaded 4
downloaded 4
processed 3
downloaded 5
processed 4
processed 4
processed 4
processed 5
finished at 05:32:54
[0, 2, 4, 6, 8, 10, 8, 6, 8]

或者,您可以保留

res = tg.create_task(process_data(download_dummy(task))
目前的样子,但然后定义
process_data
以期望协同例程,如下所示:

import asyncio
import time

task_inputs = [0,1,2,3,4,5,4,3,4]

async def download_dummy(url):
    await asyncio.sleep(url)
    data = url
    print(f'downloaded {url}')
    return data

async def process_data(coro):
    data = await coro
    await asyncio.sleep(1)
    processed_data = data*2
    print(f"processed {data}")
    return processed_data

async def main(task_inputs):
    task_handlers  = []
    print(f"started at {time.strftime('%X')}")
    async with asyncio.TaskGroup() as tg:
        for task in task_inputs:
            res = tg.create_task(process_data(download_dummy(task)))
            # res = tg.create_task(download_dummy(task))
            task_handlers.append(res)

    print(f"finished at {time.strftime('%X')}")
    results = [task_handler.result() for task_handler in task_handlers]
    print(results)

asyncio.run(main(task_inputs))

在这种情况下,您将传递给

process_data
一个协程,必须等待该协程才能获取将要处理的数据。这可行,但逻辑却很复杂。

© www.soinside.com 2019 - 2024. All rights reserved.