如何从非异步方法读取异步流?

问题描述 投票:0回答:1

我使用 FastAPI 来创建应用程序。功能之一是使用 PUT 请求上传文件。 Fast API 支持使用 POST 请求(多部分形式)上传文件。 但我需要 PUT 请求,其中文件只是请求的正文。

我发现我可以使用Request对象,并且有stream()方法来访问请求主体。

但是,我在异步代码方面遇到了问题。我的其余代码不是异步的。最初是 WSGI 应用程序。

@f_app.put("/share/{share_name}/file/upload")
def upload_file(request: Request, share_name:str):
    path = request.headers.get("x-file-path","")

    request.state.api_app.router.input.with_stream(request.stream())
    
    return UploadAPIEndpoint(request.state.api_app).action("put",
                            share_name = share_name, path = path)

此代码失败并出现错误

'async_generator' object has no attribute 'read'

在这段代码与 WSGI 一起使用之前

request.state.api_app.router.input.with_stream(environment['wsgi.input'])

在该方法内部,对该对象执行 read() 操作(带有 with_stream() 参数)

request.stream() 在 FastAPI 中是“异步”的。

如何在不将代码在很多地方更改为异步的情况下解决这个问题?

也许可以有一些额外的类可以像异步流的包装器一样工作? 或者也许可以使用“通道”或“队列”等一些技巧来运行两个并行协程?将从异步流中读取并放入队列/通道,我的主代码将从该共享队列/通道中读取?

你有解决这个问题的例子吗?

python asynchronous async-await python-asyncio fastapi
1个回答
0
投票

我没有找到“通用”的解决方案。 但是,我已经找到了如何使用 FastAPI 异步流和我现有的“同步”代码解决这个特定问题。

最后,代码如下

@f_app.put("/share/{share_name}/file/upload")
async def upload_file(request: Request, share_name:str):
    path = request.headers.get("x-file-path","")

    upload_obj = UploadAPIEndpoint(request.state.api_app)    

    # Get special "stream receiver" object
    data_receiver = upload_obj.get_stream_receiver(share_name, path)

    async for chunk in request.stream():
        data_receiver.write_to_stream(chunk)
    
    # Do final job
    return upload_obj.finish_upload()
© www.soinside.com 2019 - 2024. All rights reserved.