我有代码可以创建文件的 zip 存档并对其进行流式传输。问题是,对于大型请求,这意味着在传输数据之前可能需要几分钟的处理时间,从而导致取消请求出现问题,因为 Python 将继续运行。理想情况下,我会通过生成器函数一次生成每个压缩文件,并且用户仍然收到整个 zip 存档,从而使请求的取消更加稳健。
我有一个最小的工作示例。如果该函数生成单个文件(N=1),则它可以正常工作。如果您想要生成 2 个或更多 (N>=2),则 zip 已损坏。这是一个最小的工作示例:
# test endpoint
from fastapi import APIRouter, Path
from numpy import random
import io, zipfile
from fastapi.responses import StreamingResponse
router = APIRouter(tags=["Make files"])
# make some fake data and zip, then yield
def make_data(N):
"""
Make fake data.
"""
CHUNK_SIZE = 1024*1024
for n in range(N):
content = random.random(100)
name = f'{n:02}.txt'
# Create new in-memory zip file for each file
s = io.BytesIO()
with zipfile.ZipFile(s, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=2) as zf:
# Add file content to the in-memory zip file
zf.writestr(name, content)
# Seek to the beginning of the in-memory zip file
s.seek(0)
# Yield the content of the in-memory zip file for the current file
while chunk := s.read(CHUNK_SIZE):
yield chunk
# streamingresponse
def stream_data(N):
"""
Stream the files.
"""
return StreamingResponse(
make_data(N), media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename=download.zip"})
# endpoint
@router.get("/{N}")
async def yield_files(
N: int = Path(..., decription="random files to make")):
return stream_data(N)
是否可以进行简单的更改以使该脚本正常工作?